http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
new file mode 100644
index 0000000..e1c0099
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -0,0 +1,858 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOVICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Vhe ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WIVHOUV WARRANVIES OR CONDIVIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import com.google.common.collect.LinkedHashMultimap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.Stack;
+
+/**
+ * A shared buffer implementation which stores values under a key. 
Additionally, the values can be
+ * versioned such that it is possible to retrieve their predecessor element in 
the buffer.
+ * <p>
+ * The idea of the implementation is to have for each key a dedicated {@link 
SharedBufferPage}. Each
+ * buffer page maintains a collection of the inserted values.
+ *
+ * The values are wrapped in a {@link SharedBufferEntry}. The shared buffer 
entry allows to store
+ * relations between different entries. A dewey versioning scheme allows to 
discriminate between
+ * different relations (e.g. preceding element).
+ *
+ * The implementation is strongly based on the paper "Efficient Pattern 
Matching over Event Streams".
+ *
+ * @see <a 
href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf";>https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a>
+ *
+ * @param <K> Type of the keys
+ * @param <V> Type of the values
+ */
+public class SharedBuffer<K extends Serializable, V> implements Serializable {
+       private static final long serialVersionUID = 9213251042562206495L;
+
+       private final TypeSerializer<V> valueSerializer;
+
+       private transient Map<K, SharedBufferPage<K, V>> pages;
+
+       public SharedBuffer(final TypeSerializer<V> valueSerializer) {
+               this.valueSerializer = valueSerializer;
+               pages = new HashMap<>();
+       }
+
+       /**
+        * Stores given value (value + timestamp) under the given key. It 
assigns a preceding element
+        * relation to the entry which is defined by the previous key, value 
(value + timestamp).
+        *
+        * @param key Key of the current value
+        * @param value Current value
+        * @param timestamp Timestamp of the current value (a value requires 
always a timestamp to make it uniquely referable))
+        * @param previousKey Key of the value for the previous relation
+        * @param previousValue Value for the previous relation
+        * @param previousTimestamp Timestamp of the value for the previous 
relation
+        * @param version Version of the previous relation
+        */
+       public void put(
+                       final K key,
+                       final V value,
+                       final long timestamp,
+                       final K previousKey,
+                       final V previousValue,
+                       final long previousTimestamp,
+                       final DeweyNumber version) {
+               SharedBufferPage<K, V> page;
+
+               if (!pages.containsKey(key)) {
+                       page = new SharedBufferPage<K, V>(key);
+                       pages.put(key, page);
+               } else {
+                       page = pages.get(key);
+               }
+
+               final SharedBufferEntry<K, V> previousSharedBufferEntry = 
get(previousKey, previousValue, previousTimestamp);
+
+               page.add(
+                       new ValueTimeWrapper<>(value, timestamp),
+                       previousSharedBufferEntry,
+                       version);
+       }
+
+       /**
+        * Checks whether the given key, value, timestamp triple is contained 
in the shared buffer
+        *
+        * @param key Key of the value
+        * @param value Value
+        * @param timestamp Timestamp of the value
+        * @return Whether a value with the given timestamp is registered under 
the given key
+        */
+       public boolean contains(
+               final K key,
+               final V value,
+               final long timestamp) {
+
+               return pages.containsKey(key) && pages.get(key).contains(new 
ValueTimeWrapper<>(value, timestamp));
+       }
+
+       public boolean isEmpty() {
+               for (SharedBufferPage<K, V> page: pages.values()) {
+                       if (!page.isEmpty()) {
+                               return false;
+                       }
+               }
+               return true;
+       }
+
+       /**
+        * Deletes all entries in each page which have expired with respect to 
given pruning timestamp.
+        *
+        * @param pruningTimestamp The time which is used for pruning. All 
elements whose timestamp is
+        *                         lower than the pruning timestamp will be 
removed.
+        */
+       public void prune(long pruningTimestamp) {
+               Iterator<Map.Entry<K, SharedBufferPage<K, V>>> iter = 
pages.entrySet().iterator();
+
+               while (iter.hasNext()) {
+                       SharedBufferPage<K, V> page = iter.next().getValue();
+
+                       page.prune(pruningTimestamp);
+
+                       if (page.isEmpty()) {
+                               // delete page if it is empty
+                               iter.remove();
+                       }
+               }
+       }
+
+       /**
+        * Returns all elements from the previous relation starting at the 
given value with the
+        * given key and timestamp.
+        *
+        * @param key Key of the starting value
+        * @param value Value of the starting element
+        * @param timestamp Timestamp of the starting value
+        * @param version Version of the previous relation which shall be 
extracted
+        * @return Collection of previous relations starting with the given 
value
+        */
+       public Collection<LinkedHashMultimap<K, V>> extractPatterns(
+               final K key,
+               final V value,
+               final long timestamp,
+               final DeweyNumber version) {
+               Collection<LinkedHashMultimap<K, V>> result = new ArrayList<>();
+
+               // stack to remember the current extraction states
+               Stack<ExtractionState<K, V>> extractionStates = new Stack<>();
+
+               // get the starting shared buffer entry for the previous 
relation
+               SharedBufferEntry<K, V> entry = get(key, value, timestamp);
+
+               if (entry != null) {
+                       extractionStates.add(new ExtractionState<K, V>(entry, 
version, new Stack<SharedBufferEntry<K, V>>()));
+
+                       // use a depth first search to reconstruct the previous 
relations
+                       while (!extractionStates.isEmpty()) {
+                               ExtractionState<K, V> extractionState = 
extractionStates.pop();
+                               DeweyNumber currentVersion = 
extractionState.getVersion();
+                               // current path of the depth first search
+                               Stack<SharedBufferEntry<K, V>> currentPath = 
extractionState.getPath();
+
+                               // termination criterion
+                               if (currentVersion.length() == 1) {
+                                       LinkedHashMultimap<K, V> completePath = 
LinkedHashMultimap.create();
+
+                                       while(!currentPath.isEmpty()) {
+                                               SharedBufferEntry<K, V> 
currentEntry = currentPath.pop();
+
+                                               
completePath.put(currentEntry.getKey(), currentEntry.getValueTime().getValue());
+                                       }
+
+                                       result.add(completePath);
+                               } else {
+                                       SharedBufferEntry<K, V> currentEntry = 
extractionState.getEntry();
+
+                                       // append state to the path
+                                       currentPath.push(currentEntry);
+
+                                       boolean firstMatch = true;
+                                       for (SharedBufferEdge<K, V> edge : 
currentEntry.getEdges()) {
+                                               // we can only proceed if the 
current version is compatible to the version
+                                               // of this previous relation
+                                               if 
(currentVersion.isCompatibleWith(edge.getVersion())) {
+                                                       if (firstMatch) {
+                                                               // for the 
first match we don't have to copy the current path
+                                                               
extractionStates.push(new ExtractionState<K, V>(edge.getTarget(), 
edge.getVersion(), currentPath));
+                                                               firstMatch = 
false;
+                                                       } else {
+                                                               
Stack<SharedBufferEntry<K, V>> copy = new Stack<>();
+                                                               
copy.addAll(currentPath);
+
+                                                               
extractionStates.push(
+                                                                       new 
ExtractionState<K, V>(
+                                                                               
edge.getTarget(),
+                                                                               
edge.getVersion(),
+                                                                               
copy));
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+               }
+
+               return result;
+       }
+
+       /**
+        * Increases the reference counter for the given value, key, timestamp 
entry so that it is not
+        * accidentally removed.
+        *
+        * @param key Key of the value to lock
+        * @param value Value to lock
+        * @param timestamp Timestamp of the value to lock
+        */
+       public void lock(final K key, final V value, final long timestamp) {
+               SharedBufferEntry<K, V> entry = get(key, value, timestamp);
+
+               if (entry != null) {
+                       entry.increaseReferenceCounter();
+               }
+       }
+
+       /**
+        * Decreases the reference counter for the given value, key, timstamp 
entry so that it can be
+        * removed once the reference counter reaches 0.
+        *
+        * @param key Key of the value to release
+        * @param value Value to release
+        * @param timestamp Timestamp of the value to release
+        */
+       public void release(final K key, final V value, final long timestamp) {
+               SharedBufferEntry<K, V> entry = get(key, value, timestamp);
+
+               if (entry != null ) {
+                       entry.decreaseReferenceCounter();
+               }
+       }
+
+       /**
+        * Removes the given value, key, timestamp entry if its reference 
counter is 0. It will also
+        * release the next element in its previous relation and apply remove 
to this element
+        * recursively.
+        *
+        * @param key Key of the value to remove
+        * @param value Value to remove
+        * @param timestamp Timestamp of the value to remvoe
+        */
+       public void remove(final K key, final V value, final long timestamp) {
+               SharedBufferEntry<K, V> entry = get(key, value, timestamp);
+
+               if (entry != null) {
+                       internalRemove(entry);
+               }
+       }
+
+       private void writeObject(ObjectOutputStream oos) throws IOException {
+               DataOutputViewStreamWrapper target = new 
DataOutputViewStreamWrapper(oos);
+               Map<SharedBufferEntry<K, V>, Integer> entryIDs = new 
HashMap<>();
+               int totalEdges = 0;
+               int entryCounter = 0;
+
+               oos.defaultWriteObject();
+
+               // number of pages
+               oos.writeInt(pages.size());
+
+               for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry: 
pages.entrySet()) {
+                       SharedBufferPage<K, V> page = pageEntry.getValue();
+
+                       // key for the current page
+                       oos.writeObject(page.getKey());
+                       // number of page entries
+                       oos.writeInt(page.entries.size());
+
+                       for (Map.Entry<ValueTimeWrapper<V>, 
SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) {
+                               // serialize the sharedBufferEntry
+                               SharedBufferEntry<K, V> sharedBuffer = 
sharedBufferEntry.getValue();
+
+                               // assign id to the sharedBufferEntry for the 
future serialization of the previous
+                               // relation
+                               entryIDs.put(sharedBuffer, entryCounter++);
+
+                               ValueTimeWrapper<V> valueTimeWrapper = 
sharedBuffer.getValueTime();
+
+                               
valueSerializer.serialize(valueTimeWrapper.value, target);
+                               oos.writeLong(valueTimeWrapper.getTimestamp());
+
+                               int edges = sharedBuffer.edges.size();
+                               totalEdges += edges;
+
+                               oos.writeInt(sharedBuffer.referenceCounter);
+                       }
+               }
+
+               // write the edges between the shared buffer entries
+               oos.writeInt(totalEdges);
+
+               for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry: 
pages.entrySet()) {
+                       SharedBufferPage<K, V> page = pageEntry.getValue();
+
+                       for (Map.Entry<ValueTimeWrapper<V>, 
SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) {
+                               SharedBufferEntry<K, V> sharedBuffer = 
sharedBufferEntry.getValue();
+
+                               if (!entryIDs.containsKey(sharedBuffer)) {
+                                       throw new RuntimeException("Could not 
find id for entry: " + sharedBuffer);
+                               } else {
+                                       int id = entryIDs.get(sharedBuffer);
+
+                                       for (SharedBufferEdge<K, V> edge: 
sharedBuffer.edges) {
+                                               // in order to serialize the 
previous relation we simply serialize the ids
+                                               // of the source and target 
SharedBufferEntry
+                                               if (edge.target != null) {
+                                                       if 
(!entryIDs.containsKey(edge.getTarget())) {
+                                                               throw new 
RuntimeException("Could not find id for entry: " + edge.getTarget());
+                                                       } else {
+                                                               int targetId = 
entryIDs.get(edge.getTarget());
+
+                                                               
oos.writeInt(id);
+                                                               
oos.writeInt(targetId);
+                                                               
oos.writeObject(edge.version);
+                                                       }
+                                               } else {
+                                                       oos.writeInt(id);
+                                                       oos.writeInt(-1);
+                                                       
oos.writeObject(edge.version);
+                                               }
+                                       }
+                               }
+                       }
+               }
+       }
+
+       private void readObject(ObjectInputStream ois) throws IOException, 
ClassNotFoundException {
+               DataInputViewStreamWrapper source = new 
DataInputViewStreamWrapper(ois);
+               ArrayList<SharedBufferEntry<K, V>> entryList = new 
ArrayList<>();
+               ois.defaultReadObject();
+
+               this.pages = new HashMap<>();
+
+               int numberPages = ois.readInt();
+
+               for (int i = 0; i < numberPages; i++) {
+                       // key of the page
+                       @SuppressWarnings("unchecked")
+                       K key = (K)ois.readObject();
+
+                       SharedBufferPage<K, V> page = new 
SharedBufferPage<>(key);
+
+                       pages.put(key, page);
+
+                       int numberEntries = ois.readInt();
+
+                       for (int j = 0; j < numberEntries; j++) {
+                               // restore the SharedBufferEntries for the 
given page
+                               V value = valueSerializer.deserialize(source);
+                               long timestamp = ois.readLong();
+
+                               ValueTimeWrapper<V> valueTimeWrapper = new 
ValueTimeWrapper<>(value, timestamp);
+                               SharedBufferEntry<K, V> sharedBufferEntry = new 
SharedBufferEntry<K, V>(valueTimeWrapper, page);
+
+                               sharedBufferEntry.referenceCounter = 
ois.readInt();
+
+                               page.entries.put(valueTimeWrapper, 
sharedBufferEntry);
+
+                               entryList.add(sharedBufferEntry);
+                       }
+               }
+
+               // read the edges of the shared buffer entries
+               int numberEdges = ois.readInt();
+
+               for (int j = 0; j < numberEdges; j++) {
+                       int sourceIndex = ois.readInt();
+                       int targetIndex = ois.readInt();
+
+                       if (sourceIndex >= entryList.size() || sourceIndex < 0) 
{
+                               throw new RuntimeException("Could not find 
source entry with index " + sourceIndex +
+                                       ". This indicates a corrupted state.");
+                       } else {
+                               // We've already deserialized the shared buffer 
entry. Simply read its ID and
+                               // retrieve the buffer entry from the list of 
entries
+                               SharedBufferEntry<K, V> sourceEntry = 
entryList.get(sourceIndex);
+
+                               final DeweyNumber version = (DeweyNumber) 
ois.readObject();
+                               final SharedBufferEntry<K, V> target;
+
+                               if (targetIndex >= 0) {
+                                       if (targetIndex >= entryList.size()) {
+                                               throw new 
RuntimeException("Could not find target entry with index " + targetIndex +
+                                                       ". This indicates a 
corrupted state.");
+                                       } else {
+                                               target = 
entryList.get(targetIndex);
+                                       }
+                               } else {
+                                       target = null;
+                               }
+
+                               sourceEntry.edges.add(new SharedBufferEdge<K, 
V>(target, version));
+                       }
+               }
+       }
+
+       private SharedBufferEntry<K, V> get(
+               final K key,
+               final V value,
+               final long timestamp) {
+               if (pages.containsKey(key)) {
+                       return pages
+                               .get(key)
+                               .get(new ValueTimeWrapper<V>(value, timestamp));
+               } else {
+                       return null;
+               }
+       }
+
+       private void internalRemove(final SharedBufferEntry<K, V> entry) {
+               Stack<SharedBufferEntry<K, V>> entriesToRemove = new Stack<>();
+               entriesToRemove.add(entry);
+
+               while (!entriesToRemove.isEmpty()) {
+                       SharedBufferEntry<K, V> currentEntry = 
entriesToRemove.pop();
+
+                       if (currentEntry.getReferenceCounter() == 0) {
+                               currentEntry.remove();
+
+                               for (SharedBufferEdge<K, V> edge: 
currentEntry.getEdges()) {
+                                       if (edge.getTarget() != null) {
+                                               
edge.getTarget().decreaseReferenceCounter();
+                                               
entriesToRemove.push(edge.getTarget());
+                                       }
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public String toString() {
+               StringBuilder builder = new StringBuilder();
+
+               for(Map.Entry<K, SharedBufferPage<K, V>> entry 
:pages.entrySet()){
+                       builder.append("Key: 
").append(entry.getKey()).append("\n");
+                       builder.append("Value: 
").append(entry.getValue()).append("\n");
+               }
+
+               return builder.toString();
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof SharedBuffer) {
+                       @SuppressWarnings("unchecked")
+                       SharedBuffer<K, V> other = (SharedBuffer<K, V>) obj;
+
+                       return pages.equals(other.pages) && 
valueSerializer.equals(other.valueSerializer);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(pages, valueSerializer);
+       }
+
+       /**
+        * The SharedBufferPage represents a set of elements which have been 
stored under the same key.
+        *
+        * @param <K> Type of the key
+        * @param <V> Type of the value
+        */
+       private static class SharedBufferPage<K, V> {
+
+               // key of the page
+               private final K key;
+
+               // Map of entries which are stored in this page
+               private final HashMap<ValueTimeWrapper<V>, SharedBufferEntry<K, 
V>> entries;
+
+               public SharedBufferPage(final K key) {
+                       this.key = key;
+                       entries = new HashMap<>();
+               }
+
+               public K getKey() {
+                       return key;
+               }
+
+               /**
+                * Adds a new value time pair to the page. The new entry is 
linked to the previous entry
+                * with the given version.
+                *
+                * @param valueTime Value time pair to be stored
+                * @param previous Previous shared buffer entry to which the 
new entry shall be linked
+                * @param version Version of the relation between the new and 
the previous entry
+                */
+               public void add(final ValueTimeWrapper<V> valueTime, final 
SharedBufferEntry<K, V> previous, final DeweyNumber version) {
+                       SharedBufferEntry<K, V> sharedBufferEntry = 
entries.get(valueTime);
+
+                       if (sharedBufferEntry == null) {
+                               sharedBufferEntry = new SharedBufferEntry<K, 
V>(valueTime, this);
+
+                               entries.put(valueTime, sharedBufferEntry);
+                       }
+
+                       SharedBufferEdge<K, V> newEdge;
+
+                       if (previous != null) {
+                               newEdge = new SharedBufferEdge<>(previous, 
version);
+                               previous.increaseReferenceCounter();
+                       } else {
+                               newEdge = new SharedBufferEdge<>(null, version);
+                       }
+
+                       sharedBufferEntry.addEdge(newEdge);
+               }
+
+               public boolean contains(final ValueTimeWrapper<V> valueTime) {
+                       return entries.containsKey(valueTime);
+               }
+
+               public SharedBufferEntry<K, V> get(final ValueTimeWrapper<V> 
valueTime) {
+                       return entries.get(valueTime);
+               }
+
+               /**
+                * Removes all entries from the map whose timestamp is smaller 
than the pruning timestamp.
+                *
+                * @param pruningTimestamp Timestamp for the pruning
+                */
+               public void prune(long pruningTimestamp) {
+                       Iterator<Map.Entry<ValueTimeWrapper<V>, 
SharedBufferEntry<K, V>>> iterator = entries.entrySet().iterator();
+                       boolean continuePruning = true;
+
+                       while (iterator.hasNext() && continuePruning) {
+                               SharedBufferEntry<K, V> entry = 
iterator.next().getValue();
+
+                               if (entry.getValueTime().getTimestamp() <= 
pruningTimestamp) {
+                                       iterator.remove();
+                               } else {
+                                       continuePruning = false;
+                               }
+                       }
+               }
+
+               public boolean isEmpty() {
+                       return entries.isEmpty();
+               }
+
+               public SharedBufferEntry<K, V> remove(final ValueTimeWrapper<V> 
valueTime) {
+                       return entries.remove(valueTime);
+               }
+
+               @Override
+               public String toString() {
+                       StringBuilder builder = new StringBuilder();
+
+                       builder.append("SharedBufferPage(\n");
+
+                       for (SharedBufferEntry<K, V> entry: entries.values()) {
+                               builder.append(entry.toString()).append("\n");
+                       }
+
+                       builder.append(")");
+
+                       return builder.toString();
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj instanceof SharedBufferPage) {
+                               @SuppressWarnings("unchecked")
+                               SharedBufferPage<K, V> other = 
(SharedBufferPage<K, V>) obj;
+
+                               return key.equals(other.key) && 
entries.equals(other.entries);
+                       } else {
+                               return false;
+                       }
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(key, entries);
+               }
+       }
+
+       /**
+        * Entry of a {@link SharedBufferPage}. The entry contains the value 
timestamp pair, a set of
+        * edges to other shared buffer entries denoting a relation, a 
reference to the owning page and
+        * a reference counter. The reference counter counts how many 
references are kept to this entry.
+        *
+        * @param <K> Type of the key
+        * @param <V> Type of the value
+        */
+       private static class SharedBufferEntry<K, V> {
+               private final ValueTimeWrapper<V> valueTime;
+               private final Set<SharedBufferEdge<K, V>> edges;
+               private final SharedBufferPage<K, V> page;
+               private int referenceCounter;
+
+               public SharedBufferEntry(
+                       final ValueTimeWrapper<V> valueTime,
+                       final SharedBufferPage<K, V> page) {
+                       this(valueTime, null, page);
+               }
+
+               public SharedBufferEntry(
+                       final ValueTimeWrapper<V> valueTime,
+                       final SharedBufferEdge<K, V> edge,
+                       final SharedBufferPage<K, V> page) {
+                       this.valueTime = valueTime;
+                       edges = new HashSet<>();
+
+                       if (edge != null) {
+                               edges.add(edge);
+                       }
+
+                       referenceCounter = 0;
+
+                       this.page = page;
+               }
+
+               public ValueTimeWrapper<V> getValueTime() {
+                       return valueTime;
+               }
+
+               public Collection<SharedBufferEdge<K, V>> getEdges() {
+                       return edges;
+               }
+
+               public K getKey() {
+                       return page.getKey();
+               }
+
+               public void addEdge(SharedBufferEdge<K, V> edge) {
+                       edges.add(edge);
+               }
+
+               public boolean remove() {
+                       if (page != null) {
+                               page.remove(valueTime);
+
+                               return true;
+                       } else {
+                               return false;
+                       }
+               }
+
+               public void increaseReferenceCounter() {
+                       referenceCounter++;
+               }
+
+               public void decreaseReferenceCounter() {
+                       if (referenceCounter > 0) {
+                               referenceCounter--;
+                       }
+               }
+
+               public int getReferenceCounter() {
+                       return referenceCounter;
+               }
+
+               @Override
+               public String toString() {
+                       return "SharedBufferEntry(" + valueTime + ", [" + 
StringUtils.join(edges, ", ") + "], " + referenceCounter + ")";
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj instanceof SharedBufferEntry) {
+                               @SuppressWarnings("unchecked")
+                               SharedBufferEntry<K, V> other = 
(SharedBufferEntry<K, V>) obj;
+
+                               return valueTime.equals(other.valueTime) &&
+                                       getKey().equals(other.getKey()) &&
+                                       referenceCounter == 
other.referenceCounter &&
+                                       edges.equals(other.edges);
+                       } else {
+                               return false;
+                       }
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(valueTime, getKey(), 
referenceCounter, edges);
+               }
+       }
+
+       /**
+        * Versioned edge between two shared buffer entries
+        *
+        * @param <K> Type of the key
+        * @param <V> Type of the value
+        */
+       public static class SharedBufferEdge<K, V> {
+               private final SharedBufferEntry<K, V> target;
+               private final DeweyNumber version;
+
+               public SharedBufferEdge(final SharedBufferEntry<K, V> target, 
final DeweyNumber version) {
+                       this.target = target;
+                       this.version = version;
+               }
+
+               public SharedBufferEntry<K, V> getTarget() {
+                       return target;
+               }
+
+               public DeweyNumber getVersion() {
+                       return version;
+               }
+
+               @Override
+               public String toString() {
+                       return "SharedBufferEdge(" + target + ", " + version + 
")";
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj instanceof SharedBufferEdge) {
+                               @SuppressWarnings("unchecked")
+                               SharedBufferEdge<K, V> other = 
(SharedBufferEdge<K, V>) obj;
+
+                               if (version.equals(other.version)) {
+                                       if (target == null && other.target == 
null) {
+                                               return true;
+                                       } else if (target != null && 
other.target != null) {
+                                               return 
target.getKey().equals(other.target.getKey()) &&
+                                                       
target.getValueTime().equals(other.target.getValueTime());
+                                       } else {
+                                               return false;
+                                       }
+                               } else {
+                                       return false;
+                               }
+                       } else {
+                               return false;
+                       }
+               }
+
+               @Override
+               public int hashCode() {
+                       if (target != null) {
+                               return Objects.hash(target.getKey(), 
target.getValueTime(), version);
+                       } else {
+                               return version.hashCode();
+                       }
+               }
+       }
+
+       /**
+        * Wrapper for a value timestamp pair.
+        *
+        * @param <V> Type of the value
+        */
+       static class ValueTimeWrapper<V> {
+               private final V value;
+               private final long timestamp;
+
+               public ValueTimeWrapper(final V value, final long timestamp) {
+                       this.value = value;
+                       this.timestamp = timestamp;
+               }
+
+               public V getValue() {
+                       return value;
+               }
+
+               public long getTimestamp() {
+                       return timestamp;
+               }
+
+               @Override
+               public String toString() {
+                       return "ValueTimeWrapper(" + value + ", " + timestamp + 
")";
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj instanceof ValueTimeWrapper) {
+                               @SuppressWarnings("unchecked")
+                               ValueTimeWrapper<V> other = 
(ValueTimeWrapper<V>)obj;
+
+                               return timestamp == other.getTimestamp() && 
value.equals(other.getValue());
+                       } else {
+                               return false;
+                       }
+               }
+
+               @Override
+               public int hashCode() {
+                       return (int) (this.timestamp ^ this.timestamp >>> 32) + 
31 * value.hashCode();
+               }
+       }
+
+       /**
+        * Helper class to store the extraction state while extracting a 
sequence of values following
+        * the versioned entry edges.
+        *
+        * @param <K> Type of the key
+        * @param <V> Type of the value
+        */
+       private static class ExtractionState<K, V> {
+               private final SharedBufferEntry<K, V> entry;
+               private final DeweyNumber version;
+               private final Stack<SharedBufferEntry<K, V>> path;
+
+               public ExtractionState(
+                       final SharedBufferEntry<K, V> entry,
+                       final DeweyNumber version,
+                       final Stack<SharedBufferEntry<K, V>> path) {
+
+                       this.entry = entry;
+                       this.version = version;
+                       this.path = path;
+               }
+
+               public SharedBufferEntry<K, V> getEntry() {
+                       return entry;
+               }
+
+               public DeweyNumber getVersion() {
+                       return version;
+               }
+
+               public Stack<SharedBufferEntry<K, V>> getPath() {
+                       return path;
+               }
+
+               @Override
+               public String toString() {
+                       return "ExtractionState(" + entry + ", " + version + ", 
[" +  StringUtils.join(path, ", ") + "])";
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
new file mode 100644
index 0000000..50b2cf3
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * Represents a state of the {@link NFA}.
+ * <p>
+ * Each state is identified by a name and a state type. Furthermore, it 
contains a collection of
+ * state transitions. The state transitions describe under which conditions it 
is possible to enter
+ * a new state.
+ *
+ * @param <T> Type of the input events
+ */
+public class State<T> implements Serializable {
+       private static final long serialVersionUID = 6658700025989097781L;
+
+       private final String name;
+       private final StateType stateType;
+       private final Collection<StateTransition<T>> stateTransitions;
+
+       public State(final String name, final StateType stateType) {
+               this.name = name;
+               this.stateType = stateType;
+
+               stateTransitions = new ArrayList<StateTransition<T>>();
+       }
+
+       public boolean isFinal() {
+               return stateType == StateType.Final;
+       }
+
+       public boolean isStart() { return stateType == StateType.Start; }
+
+       public String getName() {
+               return name;
+       }
+
+       public Collection<StateTransition<T>> getStateTransitions() {
+               return stateTransitions;
+       }
+
+       public void addStateTransition(final StateTransition<T> 
stateTransition) {
+               stateTransitions.add(stateTransition);
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof State) {
+                       @SuppressWarnings("unchecked")
+                       State<T> other = (State<T>)obj;
+
+                       return name.equals(other.name) &&
+                               stateType == other.stateType &&
+                               stateTransitions.equals(other.stateTransitions);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public String toString() {
+               StringBuilder builder = new StringBuilder();
+
+               builder.append("State(").append(name).append(", 
").append(stateType).append(", [\n");
+
+               for (StateTransition<T> stateTransition: stateTransitions) {
+                       builder.append(stateTransition).append(",\n");
+               }
+
+               builder.append("])");
+
+               return builder.toString();
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(name, stateType, stateTransitions);
+       }
+
+       /**
+        * Set of valid state types.
+        */
+       public enum StateType {
+               Start, // the state is a starting state for the NFA
+               Final, // the state is a final state for the NFA
+               Normal // the state is neither a start nor a final state
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
new file mode 100644
index 0000000..479f28a
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class StateTransition<T> implements Serializable {
+       private static final long serialVersionUID = -4825345749997891838L;
+
+       private final StateTransitionAction action;
+       private final State<T> targetState;
+       private final FilterFunction<T> condition;
+
+       public StateTransition(final StateTransitionAction action, final 
State<T> targetState, final FilterFunction<T> condition) {
+               this.action = action;
+               this.targetState = targetState;
+               this.condition = condition;
+       }
+
+       public StateTransitionAction getAction() {
+               return action;
+       }
+
+       public State<T> getTargetState() {
+               return targetState;
+       }
+
+       public FilterFunction<T> getCondition() {
+               return condition;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof StateTransition) {
+                       @SuppressWarnings("unchecked")
+                       StateTransition<T> other = (StateTransition<T>) obj;
+
+                       return action == other.action &&
+                               
targetState.getName().equals(other.targetState.getName());
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               // we have to take the name of targetState because the 
transition might be reflexive
+               return Objects.hash(action, targetState.getName());
+       }
+
+       @Override
+       public String toString() {
+               StringBuilder builder = new StringBuilder();
+
+               builder.append("StateTransition(").append(action).append(", 
").append(targetState.getName());
+
+               if (condition != null) {
+                       builder.append(", with filter)");
+               } else {
+                       builder.append(")");
+               }
+
+               return builder.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
new file mode 100644
index 0000000..70fc7fb
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+/**
+ * Set of actions when doing a state transition from a {@link State} to 
another.
+ */
+public enum StateTransitionAction {
+       TAKE, // take the current event and assign it to the new state
+       IGNORE, // ignore the current event and do the state transition
+       PROCEED // do the state transition and keep the current event for 
further processing (epsilon transition)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
new file mode 100644
index 0000000..f2561d4
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.compiler;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.State;
+import org.apache.flink.cep.nfa.StateTransition;
+import org.apache.flink.cep.nfa.StateTransitionAction;
+import org.apache.flink.cep.pattern.FollowedByPattern;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * Compiler class containing methods to compile a {@link Pattern} into a 
{@link NFA} or a
+ * {@link NFAFactory}.
+ */
+public class NFACompiler {
+
+       protected final static String BEGINNING_STATE_NAME = "$beginningState$";
+
+       /**
+        * Compiles the given pattern into a {@link NFA}.
+        *
+        * @param pattern Definition of sequence pattern
+        * @param inputTypeSerializer Serializer for the input type
+        * @param <T> Type of the input events
+        * @return Non-deterministic finite automaton representing the given 
pattern
+        */
+       public static <T> NFA<T> compile(Pattern<T, ?> pattern, 
TypeSerializer<T> inputTypeSerializer) {
+               NFAFactory<T> factory = compileFactory(pattern, 
inputTypeSerializer);
+
+               return factory.createNFA();
+       }
+
+       /**
+        * Compiles the given pattern into a {@link NFAFactory}. The NFA 
factory can be used to create
+        * multiple NFAs.
+        *
+        * @param pattern Definition of sequence pattern
+        * @param inputTypeSerializer Serializer for the input type
+        * @param <T> Type of the input events
+        * @return Factory for NFAs corresponding to the given pattern
+        */
+       @SuppressWarnings("unchecked")
+       public static <T> NFAFactory<T> compileFactory(Pattern<T, ?> pattern, 
TypeSerializer<T> inputTypeSerializer) {
+               if (pattern == null) {
+                       // return a factory for empty NFAs
+                       return new NFAFactoryImpl<T>(inputTypeSerializer, 0, 
Collections.<State<T>>emptyList());
+               } else {
+                       // set of all generated states
+                       Map<String, State<T>> states = new HashMap<>();
+                       long windowTime;
+
+                       Pattern<T, ?> succeedingPattern;
+                       State<T> succeedingState;
+                       Pattern<T, ?> currentPattern = pattern;
+
+                       // we're traversing the pattern from the end to the 
beginning --> the first state is the final state
+                       State<T> currentState = new 
State<>(currentPattern.getName(), State.StateType.Final);
+
+                       states.put(currentPattern.getName(), currentState);
+
+                       windowTime = currentPattern.getWindowTime() != null ? 
currentPattern.getWindowTime().toMilliseconds() : 0L;
+
+                       while (currentPattern.getPrevious() != null) {
+                               succeedingPattern = currentPattern;
+                               succeedingState = currentState;
+                               currentPattern = currentPattern.getPrevious();
+
+                               Time currentWindowTime = 
currentPattern.getWindowTime();
+
+                               if (currentWindowTime != null && 
currentWindowTime.toMilliseconds() < windowTime) {
+                                       // the window time is the global 
minimum of all window times of each state
+                                       windowTime = 
currentWindowTime.toMilliseconds();
+                               }
+
+                               if 
(states.containsKey(currentPattern.getName())) {
+                                       currentState = 
states.get(currentPattern.getName());
+                               } else {
+                                       currentState = new 
State<>(currentPattern.getName(), State.StateType.Normal);
+                                       states.put(currentState.getName(), 
currentState);
+                               }
+
+                               currentState.addStateTransition(new 
StateTransition<T>(
+                                       StateTransitionAction.TAKE,
+                                       succeedingState,
+                                       (FilterFunction<T>) 
succeedingPattern.getFilterFunction()));
+
+                               if (succeedingPattern instanceof 
FollowedByPattern) {
+                                       // the followed by pattern entails a 
reflexive ignore transition
+                                       currentState.addStateTransition(new 
StateTransition<T>(
+                                               StateTransitionAction.IGNORE,
+                                               currentState,
+                                               null
+                                       ));
+                               }
+                       }
+
+                       // add the beginning state
+                       final State<T> beginningState;
+
+                       if (states.containsKey(BEGINNING_STATE_NAME)) {
+                               beginningState = 
states.get(BEGINNING_STATE_NAME);
+                       } else {
+                               beginningState = new 
State<>(BEGINNING_STATE_NAME, State.StateType.Start);
+                               states.put(BEGINNING_STATE_NAME, 
beginningState);
+                       }
+
+                       beginningState.addStateTransition(new 
StateTransition<T>(
+                               StateTransitionAction.TAKE,
+                               currentState,
+                               (FilterFunction<T>) 
currentPattern.getFilterFunction()
+                       ));
+
+                       NFA<T> nfa = new NFA<T>(inputTypeSerializer, 
windowTime);
+                       nfa.addStates(states.values());
+
+                       return new NFAFactoryImpl<T>(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()));
+               }
+       }
+
+       /**
+        * Factory interface for {@link NFA}.
+        *
+        * @param <T> Type of the input events which are processed by the NFA
+        */
+       public interface NFAFactory<T> extends Serializable {
+               NFA<T> createNFA();
+       }
+
+       /**
+        * Implementation of the {@link NFAFactory} interface.
+        * <p>
+        * The implementation takes the input type serializer, the window time 
and the set of
+        * states and their transitions to be able to create an NFA from them.
+        *
+        * @param <T> Type of the input events which are processed by the NFA
+        */
+       private static class NFAFactoryImpl<T> implements NFAFactory<T> {
+
+               private static final long serialVersionUID = 
8939783698296714379L;
+
+               private final TypeSerializer<T> inputTypeSerializer;
+               private final long windowTime;
+               private final Collection<State<T>> states;
+
+               private NFAFactoryImpl(TypeSerializer<T> inputTypeSerializer, 
long windowTime, Collection<State<T>> states) {
+                       this.inputTypeSerializer = inputTypeSerializer;
+                       this.windowTime = windowTime;
+                       this.states = states;
+               }
+
+               @Override
+               public NFA<T> createNFA() {
+                       NFA<T> result =  new 
NFA<>(inputTypeSerializer.duplicate(), windowTime);
+
+                       result.addStates(states);
+
+                       return result;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
new file mode 100644
index 0000000..a943f0d
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+/**
+ * Base class for CEP pattern operator. The operator uses a {@link NFA} to 
detect complex event
+ * patterns. The detected event patterns are then outputted to the down stream 
operators.
+ *
+ * @param <IN> Type of the input elements
+ */
+public abstract class AbstractCEPPatternOperator<IN>
+       extends AbstractStreamOperator<Map<String, IN>>
+       implements OneInputStreamOperator<IN, Map<String, IN>> {
+
+       private static final long serialVersionUID = -4166778210774160757L;
+
+       protected static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
+
+       private final TypeSerializer<IN> inputSerializer;
+       private final boolean isProcessingTime;
+
+       public AbstractCEPPatternOperator(
+                       final TypeSerializer<IN> inputSerializer,
+                       final boolean isProcessingTime) {
+               this.inputSerializer = inputSerializer;
+               this.isProcessingTime = isProcessingTime;
+       }
+
+       public TypeSerializer<IN> getInputSerializer() {
+               return inputSerializer;
+       }
+
+       protected abstract NFA<IN> getNFA() throws IOException;
+
+       protected abstract PriorityQueue<StreamRecord<IN>> getPriorityQueue() 
throws IOException;
+
+       @Override
+       public void processElement(StreamRecord<IN> element) throws Exception {
+               if (isProcessingTime) {
+                       // there can be no out of order elements in processing 
time
+                       NFA<IN> nfa = getNFA();
+                       processEvent(nfa, element.getValue(), 
element.getTimestamp());
+               } else {
+                       PriorityQueue<StreamRecord<IN>> priorityQueue = 
getPriorityQueue();
+
+                       // event time processing
+                       // we have to buffer the elements until we receive the 
proper watermark
+                       if (getExecutionConfig().isObjectReuseEnabled()) {
+                               // copy the StreamRecord so that it cannot be 
changed
+                               priorityQueue.offer(new 
StreamRecord<IN>(inputSerializer.copy(element.getValue()), 
element.getTimestamp()));
+                       } else {
+                               priorityQueue.offer(element);
+                       }
+               }
+       }
+
+       /**
+        * Process the given event by giving it to the NFA and outputting the 
produced set of matched
+        * event sequences.
+        *
+        * @param nfa NFA to be used for the event detection
+        * @param event The current event to be processed
+        * @param timestamp The timestamp of the event
+        */
+       protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
+               Collection<Map<String, IN>> patterns = nfa.process(
+                       event,
+                       timestamp);
+
+               if (!patterns.isEmpty()) {
+                       StreamRecord<Map<String, IN>> streamRecord = new 
StreamRecord<Map<String, IN>>(
+                               null,
+                               timestamp);
+
+                       for (Map<String, IN> pattern : patterns) {
+                               streamRecord.replace(pattern);
+                               output.collect(streamRecord);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
new file mode 100644
index 0000000..2ad152e
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.PriorityQueue;
+
+/**
+ * CEP pattern operator implementation which is used for non keyed streams. 
Consequently,
+ * the operator state only includes a single {@link NFA} and a priority queue 
to order out of order
+ * elements in case of event time processing.
+ *
+ * @param <IN> Type of the input elements
+ */
+public class CEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN> {
+       private static final long serialVersionUID = 7487334510746595640L;
+
+       private final StreamRecordSerializer<IN> streamRecordSerializer;
+
+       // global nfa for all elements
+       private NFA<IN> nfa;
+
+       // queue to buffer out of order stream records
+       private transient PriorityQueue<StreamRecord<IN>> priorityQueue;
+
+       public CEPPatternOperator(
+                       TypeSerializer<IN> inputSerializer,
+                       boolean isProcessingTime,
+                       NFACompiler.NFAFactory<IN> nfaFactory) {
+               super(inputSerializer, isProcessingTime);
+
+               this.streamRecordSerializer = new 
StreamRecordSerializer<>(inputSerializer);
+               this.nfa = nfaFactory.createNFA();
+       }
+
+       @Override
+       public void open() {
+               if (priorityQueue == null) {
+                       priorityQueue = new 
PriorityQueue<StreamRecord<IN>>(INITIAL_PRIORITY_QUEUE_CAPACITY, new 
StreamRecordComparator<IN>());
+               }
+       }
+
+       @Override
+       protected NFA<IN> getNFA() throws IOException {
+               return nfa;
+       }
+
+       @Override
+       protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws 
IOException {
+               return priorityQueue;
+       }
+
+       @Override
+       public void processWatermark(Watermark mark) throws Exception {
+               while(!priorityQueue.isEmpty() && 
priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
+                       StreamRecord<IN> streamRecord = priorityQueue.poll();
+
+                       processEvent(nfa, streamRecord.getValue(), 
streamRecord.getTimestamp());
+               }
+       }
+
+       @Override
+       public StreamTaskState snapshotOperatorState(long checkpointId, long 
timestamp) throws Exception {
+               StreamTaskState taskState = 
super.snapshotOperatorState(checkpointId, timestamp);
+
+               final StateBackend.CheckpointStateOutputStream os = 
this.getStateBackend().createCheckpointStateOutputStream(
+                       checkpointId,
+                       timestamp);
+
+               final ObjectOutputStream oos = new ObjectOutputStream(os);
+               final StateBackend.CheckpointStateOutputView ov = new 
StateBackend.CheckpointStateOutputView(os);
+
+               oos.writeObject(nfa);
+
+               ov.writeInt(priorityQueue.size());
+
+               for (StreamRecord<IN> streamRecord: priorityQueue) {
+                       streamRecordSerializer.serialize(streamRecord, ov);
+               }
+
+               taskState.setOperatorState(os.closeAndGetHandle());
+
+               return taskState;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public void restoreState(StreamTaskState state, long recoveryTimestamp) 
throws Exception {
+               super.restoreState(state, recoveryTimestamp);
+
+               StreamStateHandle stream = 
(StreamStateHandle)state.getOperatorState();
+
+               final InputStream is = 
stream.getState(getUserCodeClassloader());
+               final ObjectInputStream ois = new ObjectInputStream(is);
+               final DataInputViewStreamWrapper div = new 
DataInputViewStreamWrapper(is);
+
+               nfa = (NFA<IN>)ois.readObject();
+
+               int numberPriorityQueueEntries = div.readInt();
+
+               priorityQueue = new 
PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new 
StreamRecordComparator<IN>());
+
+               for (int i = 0; i <numberPriorityQueueEntries; i++) {
+                       
priorityQueue.offer(streamRecordSerializer.deserialize(div));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
new file mode 100644
index 0000000..03758c7
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * CEP pattern operator implementation for a keyed input stream. For each key, 
the operator creates
+ * a {@link NFA} and a priority queue to buffer out of order elements. Both 
data structures are
+ * stored using the key value state. Additionally, the set of all seen keys is 
kept as part of the
+ * operator state. This is necessary to trigger the execution for all keys 
upon receiving a new
+ * watermark.
+ *
+ * @param <IN> Type of the input elements
+ * @param <KEY> Type of the key on which the input stream is keyed
+ */
+public class KeyedCEPPatternOperator<IN, KEY> extends 
AbstractCEPPatternOperator<IN> {
+       private static final long serialVersionUID = -7234999752950159178L;
+
+       private static final String NFA_OPERATOR_STATE_NAME = 
"nfaOperatorState";
+       private static final String PRIORIRY_QUEUE_STATE_NAME = 
"priorityQueueStateName";
+
+       // necessary to extract the key from the input elements
+       private final KeySelector<IN, KEY> keySelector;
+
+       // necessary to serialize the set of seen keys
+       private final TypeSerializer<KEY> keySerializer;
+
+       private final PriorityQueueFactory<StreamRecord<IN>> 
priorityQueueFactory = new PriorityQueueStreamRecordFactory<>();
+       private final NFACompiler.NFAFactory<IN> nfaFactory;
+
+       // stores the keys we've already seen to trigger execution upon 
receiving a watermark
+       // this can be problematic, since it is never cleared
+       // TODO: fix once the state refactoring is completed
+       private transient Set<KEY> keys;
+
+       private transient OperatorState<NFA<IN>> nfaOperatorState;
+       private transient OperatorState<PriorityQueue<StreamRecord<IN>>> 
priorityQueueOperatorState;
+
+       public KeyedCEPPatternOperator(
+                       TypeSerializer<IN> inputSerializer,
+                       boolean isProcessingTime,
+                       KeySelector<IN, KEY> keySelector,
+                       TypeSerializer<KEY> keySerializer,
+                       NFACompiler.NFAFactory<IN> nfaFactory) {
+               super(inputSerializer, isProcessingTime);
+
+               this.keySelector = keySelector;
+               this.keySerializer = keySerializer;
+
+               this.nfaFactory = nfaFactory;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public void open() throws Exception {
+               if (keys == null) {
+                       keys = new HashSet<>();
+               }
+
+               if (nfaOperatorState == null) {
+                       nfaOperatorState = this.createKeyValueState(
+                               NFA_OPERATOR_STATE_NAME,
+                               new KryoSerializer<NFA<IN>>((Class<NFA<IN>>) 
(Class<?>) NFA.class, getExecutionConfig()),
+                               null);
+               }
+
+               if (priorityQueueOperatorState == null) {
+                       priorityQueueOperatorState = this.createKeyValueState(
+                               PRIORIRY_QUEUE_STATE_NAME,
+                               new PriorityQueueSerializer<StreamRecord<IN>>(
+                                       new 
StreamRecordSerializer<IN>(getInputSerializer()),
+                                       new 
PriorityQueueStreamRecordFactory<IN>()),
+                               null);
+               }
+       }
+
+       @Override
+       protected NFA<IN> getNFA() throws IOException {
+               NFA<IN> nfa = nfaOperatorState.value();
+
+               if (nfa == null) {
+                       nfa = nfaFactory.createNFA();
+
+                       nfaOperatorState.update(nfa);
+               }
+
+               return nfa;
+       }
+
+       @Override
+       protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws 
IOException {
+               PriorityQueue<StreamRecord<IN>> priorityQueue = 
priorityQueueOperatorState.value();
+
+               if (priorityQueue == null) {
+                       priorityQueue = 
priorityQueueFactory.createPriorityQueue();
+
+                       priorityQueueOperatorState.update(priorityQueue);
+               }
+
+               return priorityQueue;
+       }
+
+       @Override
+       public void processElement(StreamRecord<IN> element) throws Exception {
+               keys.add(keySelector.getKey(element.getValue()));
+
+               super.processElement(element);
+       }
+
+       @Override
+       public void processWatermark(Watermark mark) throws Exception {
+               // iterate over all keys to trigger the execution of the 
buffered elements
+               for (KEY key: keys) {
+                       setKeyContext(key);
+
+                       PriorityQueue<StreamRecord<IN>> priorityQueue = 
getPriorityQueue();
+
+                       NFA<IN> nfa = getNFA();
+
+                       while (!priorityQueue.isEmpty() && 
priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
+                               StreamRecord<IN> streamRecord = 
priorityQueue.poll();
+
+                               processEvent(nfa, streamRecord.getValue(), 
streamRecord.getTimestamp());
+                       }
+               }
+       }
+
+       @Override
+       public StreamTaskState snapshotOperatorState(long checkpointId, long 
timestamp) throws Exception {
+               StreamTaskState taskState = 
super.snapshotOperatorState(checkpointId, timestamp);
+
+               StateBackend.CheckpointStateOutputView ov = 
getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+
+               ov.writeInt(keys.size());
+
+               for (KEY key: keys) {
+                       keySerializer.serialize(key, ov);
+               }
+
+               taskState.setOperatorState(ov.closeAndGetHandle());
+
+               return taskState;
+       }
+
+       @Override
+       public void restoreState(StreamTaskState state, long recoveryTimestamp) 
throws Exception {
+               super.restoreState(state, recoveryTimestamp);
+
+               @SuppressWarnings("unchecked")
+               StateHandle<DataInputView> stateHandle = 
(StateHandle<DataInputView>) state;
+
+               DataInputView inputView = 
stateHandle.getState(getUserCodeClassloader());
+
+               if (keys == null) {
+                       keys = new HashSet<>();
+               }
+
+               int numberEntries = inputView.readInt();
+
+               for (int i = 0; i <numberEntries; i++) {
+                       keys.add(keySerializer.deserialize(inputView));
+               }
+       }
+
+       /**
+        * Custom type serializer implementation to serialize priority queues.
+        *
+        * @param <T> Type of the priority queue's elements
+        */
+       private static class PriorityQueueSerializer<T> extends 
TypeSerializer<PriorityQueue<T>> {
+
+               private static final long serialVersionUID = 
-231980397616187715L;
+
+               private final TypeSerializer<T> elementSerializer;
+               private final PriorityQueueFactory<T> factory;
+
+               public PriorityQueueSerializer(final TypeSerializer<T> 
elementSerializer, final PriorityQueueFactory<T> factory) {
+                       this.elementSerializer = elementSerializer;
+                       this.factory = factory;
+               }
+
+               @Override
+               public boolean isImmutableType() {
+                       return false;
+               }
+
+               @Override
+               public TypeSerializer<PriorityQueue<T>> duplicate() {
+                       return new 
PriorityQueueSerializer<>(elementSerializer.duplicate(), factory);
+               }
+
+               @Override
+               public PriorityQueue<T> createInstance() {
+                       return factory.createPriorityQueue();
+               }
+
+               @Override
+               public PriorityQueue<T> copy(PriorityQueue<T> from) {
+                       PriorityQueue<T> result = factory.createPriorityQueue();
+
+                       for (T element: from) {
+                               result.offer(elementSerializer.copy(element));
+                       }
+
+                       return result;
+               }
+
+               @Override
+               public PriorityQueue<T> copy(PriorityQueue<T> from, 
PriorityQueue<T> reuse) {
+                       reuse.clear();
+
+                       for (T element: from) {
+                               reuse.offer(elementSerializer.copy(element));
+                       }
+
+                       return reuse;
+               }
+
+               @Override
+               public int getLength() {
+                       return 0;
+               }
+
+               @Override
+               public void serialize(PriorityQueue<T> record, DataOutputView 
target) throws IOException {
+                       target.writeInt(record.size());
+
+                       for (T element: record) {
+                               elementSerializer.serialize(element, target);
+                       }
+               }
+
+               @Override
+               public PriorityQueue<T> deserialize(DataInputView source) 
throws IOException {
+                       PriorityQueue<T> result = factory.createPriorityQueue();
+
+                       return deserialize(result, source);
+               }
+
+               @Override
+               public PriorityQueue<T> deserialize(PriorityQueue<T> reuse, 
DataInputView source) throws IOException {
+                       reuse.clear();
+
+                       int numberEntries = source.readInt();
+
+                       for (int i = 0; i < numberEntries; i++) {
+                               
reuse.offer(elementSerializer.deserialize(source));
+                       }
+
+                       return reuse;
+               }
+
+               @Override
+               public void copy(DataInputView source, DataOutputView target) 
throws IOException {
+
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj instanceof PriorityQueueSerializer) {
+                               @SuppressWarnings("unchecked")
+                               PriorityQueueSerializer<T> other = 
(PriorityQueueSerializer<T>) obj;
+
+                               return factory.equals(other.factory) && 
elementSerializer.equals(other.elementSerializer);
+                       } else {
+                               return false;
+                       }
+               }
+
+               @Override
+               public boolean canEqual(Object obj) {
+                       return obj instanceof PriorityQueueSerializer;
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(factory, elementSerializer);
+               }
+       }
+
+       private interface PriorityQueueFactory<T> extends Serializable {
+               PriorityQueue<T> createPriorityQueue();
+       }
+
+       private static class PriorityQueueStreamRecordFactory<T> implements 
PriorityQueueFactory<StreamRecord<T>> {
+
+               private static final long serialVersionUID = 
1254766984454616593L;
+
+               @Override
+               public PriorityQueue<StreamRecord<T>> createPriorityQueue() {
+                       return new 
PriorityQueue<StreamRecord<T>>(INITIAL_PRIORITY_QUEUE_CAPACITY, new 
StreamRecordComparator<T>());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java
new file mode 100644
index 0000000..b290e7b
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+/**
+ * Compares two {@link StreamRecord}s based on their timestamp
+ *
+ * @param <IN> Type of the value field of the StreamRecord
+ */
+public class StreamRecordComparator<IN> implements 
Comparator<StreamRecord<IN>>, Serializable {
+       private static final long serialVersionUID = 1581054988433915305L;
+
+       @Override
+       public int compare(StreamRecord<IN> o1, StreamRecord<IN> o2) {
+               if (o1.getTimestamp() < o2.getTimestamp()) {
+                       return -1;
+               } else if (o1.getTimestamp() > o2.getTimestamp()) {
+                       return 1;
+               } else {
+                       return 0;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
new file mode 100644
index 0000000..d01643d
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * A filter function which combines two filter functions with a logical and. 
Thus, the filter
+ * function only returns true, iff both filters return true.
+ *
+ * @param <T> Type of the element to filter
+ */
+public class AndFilterFunction<T> implements FilterFunction<T> {
+       private static final long serialVersionUID = -2109562093871155005L;
+
+       private final FilterFunction<T> left;
+       private final FilterFunction<T> right;
+
+       public AndFilterFunction(final FilterFunction<T> left, final 
FilterFunction<T> right) {
+               this.left = left;
+               this.right = right;
+       }
+
+       @Override
+       public boolean filter(T value) throws Exception {
+               return left.filter(value) && right.filter(value);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java
new file mode 100644
index 0000000..266451c
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+/**
+ * Pattern operator which signifies that the there is a non-strict temporal 
contiguity between
+ * itself and its preceding pattern operator. This means that there might be 
events in between
+ * two matching events. These events are then simply ignored.
+ *
+ * @param <T> Base type of the events
+ * @param <F> Subtype of T to which the operator is currently constrained
+ */
+public class FollowedByPattern<T, F extends T> extends Pattern<T, F> {
+       FollowedByPattern(final String name, Pattern<T, ?> previous) {
+               super(name, previous);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
new file mode 100644
index 0000000..76f660a
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+/**
+ * Base class for a pattern definition.
+ * <p>
+ * A pattern definition is used by {@link 
org.apache.flink.cep.nfa.compiler.NFACompiler} to create
+ * a {@link NFA}.
+ *
+ * <pre>{@code
+ * Pattern<T> pattern = Pattern.<T>begin("start")
+ *   .next("middle").subtype(F.class)
+ *   .followedBy("end").where(new MyFilterFunction());
+ * }
+ * </pre>
+ *
+ * @param <T> Base type of the elements appearing in the pattern
+ * @param <F> Subtype of T to which the current pattern operator is constrained
+ */
+public class Pattern<T, F extends T> {
+
+       // name of the pattern operator
+       private final String name;
+
+       // previous pattern operator
+       private final Pattern<T, ?> previous;
+
+       // filter condition for an event to be matched
+       private FilterFunction<F> filterFunction;
+
+       // window length in which the pattern match has to occur
+       private Time windowTime;
+
+       protected Pattern(final String name, final Pattern<T, ?> previous) {
+               this.name = name;
+               this.previous = previous;
+       }
+
+       public String getName() {
+               return name;
+       }
+
+       public Pattern<T, ?> getPrevious() {
+               return previous;
+       }
+
+       public FilterFunction<F> getFilterFunction() {
+               return filterFunction;
+       }
+
+       public Time getWindowTime() {
+               return windowTime;
+       }
+
+       /**
+        * Specifies a filter condition which has to be fulfilled by an event 
in order to be matched.
+        *
+        * @param newFilterFunction Filter condition
+        * @return The same pattern operator where the new filter condition is 
set
+        */
+       public Pattern<T, F> where(FilterFunction<F> newFilterFunction) {
+               ClosureCleaner.clean(newFilterFunction, true);
+
+               if (this.filterFunction == null) {
+                       this.filterFunction = newFilterFunction;
+               } else {
+                       this.filterFunction = new 
AndFilterFunction<F>(this.filterFunction, newFilterFunction);
+               }
+
+               return this;
+       }
+
+       /**
+        * Applies a subtype constraint on the current pattern operator. This 
means that an event has
+        * to be of the given subtype in order to be matched.
+        *
+        * @param subtypeClass Class of the subtype
+        * @param <S> Type of the subtype
+        * @return The same pattern operator with the new subtype constraint
+        */
+       public <S extends F> Pattern<T, S> subtype(final Class<S> subtypeClass) 
{
+               if (filterFunction == null) {
+                       this.filterFunction = new 
SubtypeFilterFunction<F>(subtypeClass);
+               } else {
+                       this.filterFunction = new 
AndFilterFunction<F>(this.filterFunction, new 
SubtypeFilterFunction<F>(subtypeClass));
+               }
+
+               @SuppressWarnings("unchecked")
+               Pattern<T, S> result = (Pattern<T, S>) this;
+
+               return result;
+       }
+
+       /**
+        * Defines the maximum time interval for a matching pattern. This means 
that the time gap
+        * between first and the last event must not be longer than the window 
time.
+        *
+        * @param windowTime Time of the matching window
+        * @return The same pattenr operator with the new window length
+        */
+       public Pattern<T, F> within(Time windowTime) {
+               if (windowTime != null) {
+                       this.windowTime = windowTime;
+               }
+
+               return this;
+       }
+
+       /**
+        * Appends a new pattern operator to the existing one. The new pattern 
operator enforces strict
+        * temporal contiguity. This means that the whole pattern only matches 
if an event which matches
+        * this operator directly follows the preceding matching event. Thus, 
there cannot be any
+        * events in between two matching events.
+        *
+        * @param name Name of the new pattern operator
+        * @return A new pattern operator which is appended to this pattern 
operator
+        */
+       public Pattern<T, T> next(final String name) {
+               return new Pattern<T, T>(name, this);
+       }
+
+       /**
+        * Appends a new pattern operator to the existing one. The new pattern 
operator enforces
+        * non-strict temporal contiguity. This means that a matching event of 
this operator and the
+        * preceding matching event might be interleaved with other events 
which are ignored.
+        *
+        * @param name Name of the new pattern operator
+        * @return A new pattern operator which is appended to this pattern 
operator
+        */
+       public FollowedByPattern<T, T> followedBy(final String name) {
+               return new FollowedByPattern<T, T>(name, this);
+       }
+
+       /**
+        * Starts a new pattern with the initial pattern operator whose name is 
provided. Furthermore,
+        * the base type of the event sequence is set.
+        *
+        * @param name Name of the new pattern operator
+        * @param <X> Base type of the event pattern
+        * @return The first pattern operator of a pattern
+        */
+       public static <X> Pattern<X, X> begin(final String name) {
+               return new Pattern<X, X>(name, null);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java
new file mode 100644
index 0000000..f183f0f
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * A filter function which filters elements of the given type. A element if 
filtered out iff it
+ * is not assignable to the given subtype of T.
+ *
+ * @param <T> Type of the elements to be filtered
+ */
+public class SubtypeFilterFunction<T> implements FilterFunction<T> {
+       private static final long serialVersionUID = -2990017519957561355L;
+
+       // subtype to filter for
+       private final Class<? extends T> subtype;
+
+       public SubtypeFilterFunction(final Class<? extends T> subtype) {
+               this.subtype = subtype;
+       }
+
+       @Override
+       public boolean filter(T value) throws Exception {
+               return subtype.isAssignableFrom(value.getClass());
+       }
+}

Reply via email to