Repository: flink
Updated Branches:
  refs/heads/master 682d8d5e2 -> 79058edb6


http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
new file mode 100644
index 0000000..7dcda4c
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Map;
+
+public class CEPITCase extends StreamingMultipleProgramsTestBase {
+
+       private String resultPath;
+       private String expected;
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception {
+               resultPath = tempFolder.newFile().toURI().toString();
+               expected = "";
+       }
+
+       @After
+       public void after() throws Exception {
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       /**
+        * Checks that a certain event sequence is recognized
+        * @throws Exception
+        */
+       @Test
+       public void testSimplePatternCEP() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Event> input = env.fromElements(
+                       new Event(1, "barfoo", 1.0),
+                       new Event(2, "start", 2.0),
+                       new Event(3, "foobar", 3.0),
+                       new SubEvent(4, "foo", 4.0, 1.0),
+                       new Event(5, "middle", 5.0),
+                       new SubEvent(6, "middle", 6.0, 2.0),
+                       new SubEvent(7, "bar", 3.0, 3.0),
+                       new Event(42, "42", 42.0),
+                       new Event(8, "end", 1.0)
+               );
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5681493970790509488L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("start");
+                       }
+               })
+               .followedBy("middle").subtype(SubEvent.class).where(
+                               new FilterFunction<SubEvent>() {
+                                       private static final long 
serialVersionUID = 448591738315698540L;
+
+                                       @Override
+                                       public boolean filter(SubEvent value) 
throws Exception {
+                                               return 
value.getName().equals("middle");
+                                       }
+                               }
+                       )
+               .followedBy("end").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
6080276591060431966L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("end");
+                       }
+               });
+
+               DataStream<String> result = CEP.pattern(input, 
pattern).select(new PatternSelectFunction<Event, String>() {
+                       private static final long serialVersionUID = 
1447462674590806097L;
+
+                       @Override
+                       public String select(Map<String, Event> pattern) {
+                               StringBuilder builder = new StringBuilder();
+
+                               
builder.append(pattern.get("start").getId()).append(",")
+                                       
.append(pattern.get("middle").getId()).append(",")
+                                       .append(pattern.get("end").getId());
+
+                               return builder.toString();
+                       }
+               });
+
+               result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               // expected sequence of matching event ids
+               expected = "2,6,8";
+
+               env.execute();
+       }
+
+       @Test
+       public void testSimpleKeyedPatternCEP() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(2);
+
+               DataStream<Event> input = env.fromElements(
+                       new Event(1, "barfoo", 1.0),
+                       new Event(2, "start", 2.0),
+                       new Event(3, "start", 2.1),
+                       new Event(3, "foobar", 3.0),
+                       new SubEvent(4, "foo", 4.0, 1.0),
+                       new SubEvent(3, "middle", 3.2, 1.0),
+                       new Event(42, "start", 3.1),
+                       new SubEvent(42, "middle", 3.3, 1.2),
+                       new Event(5, "middle", 5.0),
+                       new SubEvent(2, "middle", 6.0, 2.0),
+                       new SubEvent(7, "bar", 3.0, 3.0),
+                       new Event(42, "42", 42.0),
+                       new Event(3, "end", 2.0),
+                       new Event(2, "end", 1.0),
+                       new Event(42, "end", 42.0)
+               ).keyBy(new KeySelector<Event, Integer>() {
+                       private static final long serialVersionUID = 
-2112041392652797483L;
+
+                       @Override
+                       public Integer getKey(Event value) throws Exception {
+                               return value.getId();
+                       }
+               });
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5681493970790509488L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("start");
+                       }
+               })
+                       .followedBy("middle").subtype(SubEvent.class).where(
+                               new FilterFunction<SubEvent>() {
+                                       private static final long 
serialVersionUID = 448591738315698540L;
+
+                                       @Override
+                                       public boolean filter(SubEvent value) 
throws Exception {
+                                               return 
value.getName().equals("middle");
+                                       }
+                               }
+                       )
+                       .followedBy("end").where(new FilterFunction<Event>() {
+                               private static final long serialVersionUID = 
6080276591060431966L;
+
+                               @Override
+                               public boolean filter(Event value) throws 
Exception {
+                                       return value.getName().equals("end");
+                               }
+                       });
+
+               DataStream<String> result = CEP.pattern(input, 
pattern).select(new PatternSelectFunction<Event, String>() {
+                       private static final long serialVersionUID = 
1447462674590806097L;
+
+                       @Override
+                       public String select(Map<String, Event> pattern) {
+                               StringBuilder builder = new StringBuilder();
+
+                               
builder.append(pattern.get("start").getId()).append(",")
+                                       
.append(pattern.get("middle").getId()).append(",")
+                                       .append(pattern.get("end").getId());
+
+                               return builder.toString();
+                       }
+               });
+
+               result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               // the expected sequences of matching event ids
+               expected = "2,2,2\n3,3,3\n42,42,42";
+
+               env.execute();
+       }
+
+       @Test
+       public void testSimplePatternEventTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               // (Event, timestamp)
+               DataStream<Event> input = env.fromElements(
+                       Tuple2.of(new Event(1, "start", 1.0), 5L),
+                       Tuple2.of(new Event(2, "middle", 2.0), 1L),
+                       Tuple2.of(new Event(3, "end", 3.0), 3L),
+                       Tuple2.of(new Event(4, "end", 4.0), 10L),
+                       Tuple2.of(new Event(5, "middle", 5.0), 7L)
+               ).assignTimestamps(new TimestampExtractor<Tuple2<Event, 
Long>>() {
+                       private static final long serialVersionUID = 
878281782188702293L;
+
+                       private Long currentMaxTimestamp = Long.MIN_VALUE;
+
+                       @Override
+                       public long extractTimestamp(Tuple2<Event, Long> 
element, long currentTimestamp) {
+                               if (currentMaxTimestamp < element.f1) {
+                                       currentMaxTimestamp = element.f1;
+                               }
+
+                               return element.f1;
+                       }
+
+                       @Override
+                       public long extractWatermark(Tuple2<Event, Long> 
element, long currentTimestamp) {
+                               return currentMaxTimestamp - 5;
+                       }
+
+                       @Override
+                       public long getCurrentWatermark() {
+                               return Long.MIN_VALUE;
+                       }
+               }).map(new MapFunction<Tuple2<Event, Long>, Event>() {
+                       private static final long serialVersionUID = 
-5288731103938665328L;
+
+                       @Override
+                       public Event map(Tuple2<Event, Long> value) throws 
Exception {
+                               return value.f0;
+                       }
+               });
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
2601494641888389648L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("start");
+                       }
+               }).followedBy("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
-3133506934766766660L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("middle");
+                       }
+               }).followedBy("end").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
-8528031731858936269L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("end");
+                       }
+               });
+
+               DataStream<String> result = CEP.pattern(input, pattern).select(
+                       new PatternSelectFunction<Event, String>() {
+                               private static final long serialVersionUID = 
1447462674590806097L;
+
+                               @Override
+                               public String select(Map<String, Event> 
pattern) {
+                                       StringBuilder builder = new 
StringBuilder();
+
+                                       
builder.append(pattern.get("start").getId()).append(",")
+                                               
.append(pattern.get("middle").getId()).append(",")
+                                               
.append(pattern.get("end").getId());
+
+                                       return builder.toString();
+                               }
+                       }
+               );
+
+               result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               // the expected sequence of matching event ids
+               expected = "1,5,4";
+
+               env.execute();
+       }
+
+       @Test
+       public void testSimpleKeyedPatternEventTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+               env.setParallelism(2);
+
+               // (Event, timestamp)
+               DataStream<Event> input = env.fromElements(
+                       Tuple2.of(new Event(1, "start", 1.0), 5L),
+                       Tuple2.of(new Event(1, "middle", 2.0), 1L),
+                       Tuple2.of(new Event(2, "middle", 2.0), 4L),
+                       Tuple2.of(new Event(2, "start", 2.0), 3L),
+                       Tuple2.of(new Event(1, "end", 3.0), 3L),
+                       Tuple2.of(new Event(3, "start", 4.1), 5L),
+                       Tuple2.of(new Event(1, "end", 4.0), 10L),
+                       Tuple2.of(new Event(2, "end", 2.0), 8L),
+                       Tuple2.of(new Event(1, "middle", 5.0), 7L),
+                       Tuple2.of(new Event(3, "middle", 6.0), 9L),
+                       Tuple2.of(new Event(3, "end", 7.0), 7L)
+               ).assignTimestamps(new TimestampExtractor<Tuple2<Event, 
Long>>() {
+                       private static final long serialVersionUID = 
878281782188702293L;
+
+                       private Long currentMaxTimestamp = Long.MIN_VALUE;
+
+                       @Override
+                       public long extractTimestamp(Tuple2<Event, Long> 
element, long currentTimestamp) {
+                               if (currentMaxTimestamp < element.f1) {
+                                       currentMaxTimestamp = element.f1;
+                               }
+
+                               return element.f1;
+                       }
+
+                       @Override
+                       public long extractWatermark(Tuple2<Event, Long> 
element, long currentTimestamp) {
+                               return currentMaxTimestamp - 5;
+                       }
+
+                       @Override
+                       public long getCurrentWatermark() {
+                               return Long.MIN_VALUE;
+                       }
+               }).map(new MapFunction<Tuple2<Event, Long>, Event>() {
+                       private static final long serialVersionUID = 
-5288731103938665328L;
+
+                       @Override
+                       public Event map(Tuple2<Event, Long> value) throws 
Exception {
+                               return value.f0;
+                       }
+               }).keyBy(new KeySelector<Event, Integer>() {
+                       private static final long serialVersionUID = 
-3282946957177720879L;
+
+                       @Override
+                       public Integer getKey(Event value) throws Exception {
+                               return value.getId();
+                       }
+               });
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
2601494641888389648L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("start");
+                       }
+               }).followedBy("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
-3133506934766766660L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("middle");
+                       }
+               }).followedBy("end").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
-8528031731858936269L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("end");
+                       }
+               });
+
+               DataStream<String> result = CEP.pattern(input, pattern).select(
+                       new PatternSelectFunction<Event, String>() {
+                               private static final long serialVersionUID = 
1447462674590806097L;
+
+                               @Override
+                               public String select(Map<String, Event> 
pattern) {
+                                       StringBuilder builder = new 
StringBuilder();
+
+                                       
builder.append(pattern.get("start").getId()).append(",")
+                                               
.append(pattern.get("middle").getId()).append(",")
+                                               
.append(pattern.get("end").getId());
+
+                                       return builder.toString();
+                               }
+                       }
+               );
+
+               result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               // the expected sequences of matching event ids
+               expected = "1,1,1\n2,2,2";
+
+               env.execute();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java
new file mode 100644
index 0000000..efe56b7
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import java.util.Objects;
+
+public class Event {
+       private String name;
+       private double price;
+       private int id;
+
+       public Event(int id, String name, double price) {
+               this.id = id;
+               this.name = name;
+               this.price = price;
+       }
+
+       public double getPrice() {
+               return price;
+       }
+
+       public int getId() {
+               return id;
+       }
+
+       public String getName() {
+               return name;
+       }
+
+       @Override
+       public String toString() {
+               return "Event(" + id + ", " + name + ", " + price + ")";
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof Event) {
+                       Event other = (Event) obj;
+
+                       return name.equals(other.name) && price == other.price 
&& id == other.id;
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(name, price, id);
+       }
+
+       public static TypeSerializer<Event> createTypeSerializer() {
+               TypeInformation<Event> typeInformation = 
(TypeInformation<Event>) TypeExtractor.createTypeInfo(Event.class);
+
+               return typeInformation.createSerializer(new ExecutionConfig());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/StreamEvent.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/StreamEvent.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/StreamEvent.java
new file mode 100644
index 0000000..adb9446
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/StreamEvent.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+public class StreamEvent<T> {
+       private final T event;
+       private final long timestamp;
+
+       public StreamEvent(T event, long timestamp) {
+               this.event = event;
+               this.timestamp = timestamp;
+       }
+
+       public long getTimestamp() {
+               return timestamp;
+       }
+
+       public T getEvent() {
+               return event;
+       }
+
+       public static <V> StreamEvent<V> of(V event, long timestamp) {
+               return new StreamEvent<>(event, timestamp);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java
new file mode 100644
index 0000000..31eff28
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+public class SubEvent extends Event {
+       private final double volume;
+
+       public SubEvent(int id, String name, double price, double volume) {
+               super(id, name, price);
+               this.volume = volume;
+       }
+
+       public double getVolume() {
+               return volume;
+       }
+
+       @Override
+       public String toString() {
+               StringBuilder builder = new StringBuilder();
+
+               builder.append("SubEvent(")
+                       .append(getId())
+                       .append(", ")
+                       .append(getName())
+                       .append(", ")
+                       .append(getPrice())
+                       .append(", ")
+                       .append(getVolume())
+                       .append(")");
+
+               return builder.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java
new file mode 100644
index 0000000..8bc010a
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class DeweyNumberTest extends TestLogger {
+
+       @Test
+       public void testDeweyNumberGeneration() {
+               DeweyNumber start = new DeweyNumber(1);
+               DeweyNumber increased = start.increase();
+               DeweyNumber increaseAddStage = increased.addStage();
+               DeweyNumber startAddStage = start.addStage();
+               DeweyNumber startAddStageIncreased = startAddStage.increase();
+               DeweyNumber startAddStageIncreasedAddStage = 
startAddStageIncreased.addStage();
+
+               assertEquals(DeweyNumber.fromString("1"), start);
+               assertEquals(DeweyNumber.fromString("2"), increased);
+               assertEquals(DeweyNumber.fromString("2.0"), increaseAddStage);
+               assertEquals(DeweyNumber.fromString("1.0"), startAddStage);
+               assertEquals(DeweyNumber.fromString("1.1"), 
startAddStageIncreased);
+               assertEquals(DeweyNumber.fromString("1.1.0"), 
startAddStageIncreasedAddStage);
+
+               assertTrue(startAddStage.isCompatibleWith(start));
+               
assertTrue(startAddStageIncreased.isCompatibleWith(startAddStage));
+               
assertTrue(startAddStageIncreasedAddStage.isCompatibleWith(startAddStageIncreased));
+               
assertFalse(startAddStageIncreasedAddStage.isCompatibleWith(startAddStage));
+               assertFalse(increaseAddStage.isCompatibleWith(startAddStage));
+               assertFalse(startAddStage.isCompatibleWith(increaseAddStage));
+               
assertFalse(startAddStageIncreased.isCompatibleWith(startAddStageIncreasedAddStage));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
new file mode 100644
index 0000000..a46a81e
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.StreamEvent;
+import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class NFAITCase extends TestLogger {
+
+       @Test
+       public void testSimplePatternNFA() {
+               List<StreamEvent<Event>> inputEvents = new 
ArrayList<StreamEvent<Event>>();
+
+               Event startEvent = new Event(42, "start", 1.0);
+               SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
+               Event endEvent=  new Event(43, "end", 1.0);
+
+               inputEvents.add(new StreamEvent<Event>(startEvent, 1));
+               inputEvents.add(new StreamEvent<Event>(new Event(43, "foobar", 
1.0), 2));
+               inputEvents.add(new StreamEvent<Event>(new SubEvent(41, 
"barfoo", 1.0, 5.0), 3));
+               inputEvents.add(new StreamEvent<Event>(middleEvent, 3));
+               inputEvents.add(new StreamEvent<Event>(new Event(43, "start", 
1.0), 4));
+               inputEvents.add(new StreamEvent<Event>(endEvent, 5));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("start");
+                       }
+               })
+               .followedBy("middle").subtype(SubEvent.class).where(new 
FilterFunction<SubEvent>() {
+                               private static final long serialVersionUID = 
6215754202506583964L;
+
+                               @Override
+                               public boolean filter(SubEvent value) throws 
Exception {
+                                       return value.getVolume() > 5.0;
+                               }
+                       })
+               .followedBy("end").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
7056763917392056548L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("end");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer());
+               List<Map<String, Event>> resultingPatterns = new ArrayList<>();
+
+               for (StreamEvent<Event> inputEvent: inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getEvent(),
+                               inputEvent.getTimestamp());
+
+                       resultingPatterns.addAll(patterns);
+               }
+
+               assertEquals(1, resultingPatterns.size());
+               Map<String, Event> patternMap = resultingPatterns.get(0);
+
+               assertEquals(startEvent, patternMap.get("start"));
+               assertEquals(middleEvent, patternMap.get("middle"));
+               assertEquals(endEvent, patternMap.get("end"));
+       }
+
+       /**
+        * Tests that the NFA successfully filters out expired elements with 
respect to the window
+        * length
+        */
+       @Test
+       public void testSimplePatternWithTimeWindowNFA() {
+               List<StreamEvent<Event>> events = new ArrayList<>();
+               List<Map<String, Event>> resultingPatterns = new ArrayList<>();
+
+               final Event startEvent;
+               final Event middleEvent;
+               final Event endEvent;
+
+               events.add(new StreamEvent<Event>(new Event(1, "start", 1.0), 
1));
+               events.add(new StreamEvent<Event>(startEvent = new Event(2, 
"start", 1.0), 2));
+               events.add(new StreamEvent<Event>(middleEvent = new Event(3, 
"middle", 1.0), 3));
+               events.add(new StreamEvent<Event>(new Event(4, "foobar", 1.0), 
4));
+               events.add(new StreamEvent<Event>(endEvent = new Event(5, 
"end", 1.0), 12));
+               events.add(new StreamEvent<Event>(new Event(6, "end", 1.0), 
13));
+
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
7907391379273505897L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("start");
+                       }
+               }).followedBy("middle").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
-3268741540234334074L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("middle");
+                       }
+               }).followedBy("end").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
-8995174172182138608L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("end");
+                       }
+               }).within(Time.milliseconds(10));
+
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer());
+
+               for (StreamEvent<Event> event: events) {
+                       Collection<Map<String, Event>> patterns = 
nfa.process(event.getEvent(), event.getTimestamp());
+
+                       resultingPatterns.addAll(patterns);
+               }
+
+               assertEquals(1, resultingPatterns.size());
+
+               Map<String, Event> patternMap = resultingPatterns.get(0);
+
+               assertEquals(startEvent, patternMap.get("start"));
+               assertEquals(middleEvent, patternMap.get("middle"));
+               assertEquals(endEvent, patternMap.get("end"));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
new file mode 100644
index 0000000..3face76
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.StreamEvent;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class NFATest extends TestLogger {
+       @Test
+       public void testSimpleNFA() {
+               NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 0);
+               List<StreamEvent<Event>> streamEvents = new ArrayList<>();
+
+               streamEvents.add(StreamEvent.of(new Event(1, "start", 1.0), 
1L));
+               streamEvents.add(StreamEvent.of(new Event(2, "bar", 2.0), 2L));
+               streamEvents.add(StreamEvent.of(new Event(3, "start", 3.0), 
3L));
+               streamEvents.add(StreamEvent.of(new Event(4, "end", 4.0), 4L));
+
+               State<Event> startingState = new State<>("", 
State.StateType.Start);
+               State<Event> startState = new State<>("start", 
State.StateType.Normal);
+               State<Event> endState = new State<>("end", 
State.StateType.Final);
+               StateTransition<Event> starting2Start = new StateTransition<>(
+                       StateTransitionAction.TAKE,
+                       startState,
+                       new FilterFunction<Event>() {
+                               private static final long serialVersionUID = 
-4869589195918650396L;
+
+                               @Override
+                               public boolean filter(Event value) throws 
Exception {
+                                       return value.getName().equals("start");
+                               }
+                       }
+               );
+
+               StateTransition<Event> start2End = new StateTransition<>(
+                       StateTransitionAction.TAKE,
+                       endState,
+                       new FilterFunction<Event>() {
+                               private static final long serialVersionUID = 
2979804163709590673L;
+
+                               @Override
+                               public boolean filter(Event value) throws 
Exception {
+                                       return value.getName().equals("end");
+                               }
+                       }
+               );
+
+               StateTransition<Event> start2Start = new 
StateTransition<>(StateTransitionAction.IGNORE, startState, null);
+
+               startingState.addStateTransition(starting2Start);
+               startState.addStateTransition(start2End);
+               startState.addStateTransition(start2Start);
+
+               nfa.addState(startingState);
+               nfa.addState(startState);
+               nfa.addState(endState);
+
+               Set<Map<String, Event>> expectedPatterns = new HashSet<>();
+
+               Map<String, Event> firstPattern = new HashMap<>();
+               firstPattern.put("start", new Event(1, "start", 1.0));
+               firstPattern.put("end", new Event(4, "end", 4.0));
+
+               Map<String, Event> secondPattern = new HashMap<>();
+               secondPattern.put("start", new Event(3, "start", 3.0));
+               secondPattern.put("end", new Event(4, "end", 4.0));
+
+               expectedPatterns.add(firstPattern);
+               expectedPatterns.add(secondPattern);
+
+               Collection<Map<String, Event>> actualPatterns = runNFA(nfa, 
streamEvents);
+
+               assertEquals(expectedPatterns, actualPatterns);
+       }
+
+       @Test
+       public void testTimeoutWindowPruning() {
+               NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 2);
+               List<StreamEvent<Event>> streamEvents = new ArrayList<>();
+
+               streamEvents.add(StreamEvent.of(new Event(1, "start", 1.0), 
1L));
+               streamEvents.add(StreamEvent.of(new Event(2, "bar", 2.0), 2L));
+               streamEvents.add(StreamEvent.of(new Event(3, "start", 3.0), 
3L));
+               streamEvents.add(StreamEvent.of(new Event(4, "end", 4.0), 4L));
+
+               State<Event> startingState = new State<>("", 
State.StateType.Start);
+               State<Event> startState = new State<>("start", 
State.StateType.Normal);
+               State<Event> endState = new State<>("end", 
State.StateType.Final);
+               StateTransition<Event> starting2Start = new StateTransition<>(
+                       StateTransitionAction.TAKE,
+                       startState,
+                       new FilterFunction<Event>() {
+                               private static final long serialVersionUID = 
-4869589195918650396L;
+
+                               @Override
+                               public boolean filter(Event value) throws 
Exception {
+                                       return value.getName().equals("start");
+                               }
+               });
+
+               StateTransition<Event> start2End = new StateTransition<>(
+                       StateTransitionAction.TAKE,
+                       endState,
+                       new FilterFunction<Event>() {
+                               private static final long serialVersionUID = 
2979804163709590673L;
+
+                               @Override
+                               public boolean filter(Event value) throws 
Exception {
+                                       return value.getName().equals("end");
+                               }
+               });
+
+               StateTransition<Event> start2Start = new StateTransition<>(
+                       StateTransitionAction.IGNORE,
+                       startState,
+                       null);
+
+               startingState.addStateTransition(starting2Start);
+               startState.addStateTransition(start2End);
+               startState.addStateTransition(start2Start);
+
+               nfa.addState(startingState);
+               nfa.addState(startState);
+               nfa.addState(endState);
+
+               Set<Map<String, Event>> expectedPatterns = new HashSet<>();
+
+               Map<String, Event> secondPattern = new HashMap<>();
+               secondPattern.put("start", new Event(3, "start", 3.0));
+               secondPattern.put("end", new Event(4, "end", 4.0));
+
+               expectedPatterns.add(secondPattern);
+
+               Collection<Map<String, Event>> actualPatterns = runNFA(nfa, 
streamEvents);
+
+               assertEquals(expectedPatterns, actualPatterns);
+       }
+
+       @Test
+       public void testStateNameGeneration() {
+               String expectedName1 = "a[2]";
+               String expectedName2 = "a_3";
+               String expectedName3 = "a[][42]";
+
+               String generatedName1 = NFA.generateStateName("a[]", 2);
+               String generatedName2 = NFA.generateStateName("a", 3);
+               String generatedName3 = NFA.generateStateName("a[][]", 42);
+
+
+               assertEquals(expectedName1, generatedName1);
+               assertEquals(expectedName2, generatedName2);
+               assertEquals(expectedName3, generatedName3);
+       }
+
+       public <T> Collection<Map<String, T>> runNFA(NFA<T> nfa, 
List<StreamEvent<T>> inputs) {
+               Set<Map<String, T>> actualPatterns = new HashSet<>();
+
+               for (StreamEvent<T> streamEvent: inputs) {
+                       Collection<Map<String, T>> matchedPatterns = 
nfa.process(streamEvent.getEvent(), streamEvent.getTimestamp());
+
+                       actualPatterns.addAll(matchedPatterns);
+               }
+
+               return actualPatterns;
+       }
+
+       @Test
+       public void testNFASerialization() throws IOException, 
ClassNotFoundException {
+               NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 0);
+
+               State<Event> startingState = new State<>("", 
State.StateType.Start);
+               State<Event> startState = new State<>("start", 
State.StateType.Normal);
+               State<Event> endState = new State<>("end", 
State.StateType.Final);
+
+               StateTransition<Event> starting2Start = new StateTransition<>(
+                       StateTransitionAction.TAKE,
+                       startState,
+                       new NameFilter("start"));
+
+               StateTransition<Event> start2End = new StateTransition<>(
+                       StateTransitionAction.TAKE,
+                       endState,
+                       new NameFilter("end"));
+
+               StateTransition<Event> start2Start = new StateTransition<>(
+                       StateTransitionAction.IGNORE,
+                       startState,
+                       null);
+
+               startingState.addStateTransition(starting2Start);
+               startState.addStateTransition(start2End);
+               startState.addStateTransition(start2Start);
+
+               nfa.addState(startingState);
+               nfa.addState(startState);
+               nfa.addState(endState);
+
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               ObjectOutputStream oos = new ObjectOutputStream(baos);
+
+               oos.writeObject(nfa);
+
+               ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
+               ObjectInputStream ois = new ObjectInputStream(bais);
+
+               @SuppressWarnings("unchecked")
+               NFA<Event> copy = (NFA<Event>) ois.readObject();
+
+               assertEquals(nfa, copy);
+       }
+
+       private static class NameFilter implements FilterFunction<Event> {
+
+               private static final long serialVersionUID = 
7472112494752423802L;
+
+               private final String name;
+
+               public NameFilter(final String name) {
+                       this.name = name;
+               }
+
+               @Override
+               public boolean filter(Event value) throws Exception {
+                       return value.getName().equals(name);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
new file mode 100644
index 0000000..25618d5
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import com.google.common.collect.LinkedHashMultimap;
+import org.apache.flink.cep.Event;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SharedBufferTest extends TestLogger {
+
+       @Test
+       public void testSharedBuffer() {
+               SharedBuffer<String, Event> sharedBuffer = new 
SharedBuffer<>(Event.createTypeSerializer());
+               int numberEvents = 8;
+               Event[] events = new Event[numberEvents];
+               final long timestamp = 1L;
+
+               for (int i = 0; i < numberEvents; i++) {
+                       events[i] = new Event(i + 1, "e" + (i + 1), i);
+               }
+
+               LinkedHashMultimap<String, Event> expectedPattern1 = 
LinkedHashMultimap.create();
+               expectedPattern1.put("a1", events[2]);
+               expectedPattern1.put("a[]", events[3]);
+               expectedPattern1.put("b", events[5]);
+
+               LinkedHashMultimap<String, Event> expectedPattern2 = 
LinkedHashMultimap.create();
+               expectedPattern2.put("a1", events[0]);
+               expectedPattern2.put("a[]", events[1]);
+               expectedPattern2.put("a[]", events[2]);
+               expectedPattern2.put("a[]", events[3]);
+               expectedPattern2.put("a[]", events[4]);
+               expectedPattern2.put("b", events[5]);
+
+               LinkedHashMultimap<String, Event> expectedPattern3 = 
LinkedHashMultimap.create();
+               expectedPattern3.put("a1", events[0]);
+               expectedPattern3.put("a[]", events[1]);
+               expectedPattern3.put("a[]", events[2]);
+               expectedPattern3.put("a[]", events[3]);
+               expectedPattern3.put("a[]", events[4]);
+               expectedPattern3.put("a[]", events[5]);
+               expectedPattern3.put("a[]", events[6]);
+               expectedPattern3.put("b", events[7]);
+
+               sharedBuffer.put("a1", events[0], timestamp, null, null, 0, 
DeweyNumber.fromString("1"));
+               sharedBuffer.put("a[]", events[1], timestamp, "a1", events[0], 
timestamp, DeweyNumber.fromString("1.0"));
+               sharedBuffer.put("a1", events[2], timestamp, null, null, 0, 
DeweyNumber.fromString("2"));
+               sharedBuffer.put("a[]", events[2], timestamp, "a[]", events[1], 
timestamp, DeweyNumber.fromString("1.0"));
+               sharedBuffer.put("a[]", events[3], timestamp, "a[]", events[2], 
timestamp, DeweyNumber.fromString("1.0"));
+               sharedBuffer.put("a[]", events[3], timestamp, "a1", events[2], 
timestamp, DeweyNumber.fromString("2.0"));
+               sharedBuffer.put("a[]", events[4], timestamp, "a[]", events[3], 
timestamp, DeweyNumber.fromString("1.0"));
+               sharedBuffer.put("a[]", events[5], timestamp, "a[]", events[4], 
timestamp, DeweyNumber.fromString("1.1"));
+               sharedBuffer.put("b", events[5], timestamp, "a[]", events[3], 
timestamp, DeweyNumber.fromString("2.0.0"));
+               sharedBuffer.put("b", events[5], timestamp, "a[]", events[4], 
timestamp, DeweyNumber.fromString("1.0.0"));
+               sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], 
timestamp, DeweyNumber.fromString("1.1"));
+               sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], 
timestamp, DeweyNumber.fromString("1.1.0"));
+
+               Collection<LinkedHashMultimap<String, Event>> patterns3 = 
sharedBuffer.extractPatterns("b", events[7], timestamp, 
DeweyNumber.fromString("1.1.0"));
+               sharedBuffer.remove("b", events[7], timestamp);
+               Collection<LinkedHashMultimap<String, Event>> patterns4 = 
sharedBuffer.extractPatterns("b", events[7], timestamp, 
DeweyNumber.fromString("1.1.0"));
+               Collection<LinkedHashMultimap<String, Event>> patterns1 = 
sharedBuffer.extractPatterns("b", events[5], timestamp, 
DeweyNumber.fromString("2.0.0"));
+               Collection<LinkedHashMultimap<String, Event>> patterns2 = 
sharedBuffer.extractPatterns("b", events[5], timestamp, 
DeweyNumber.fromString("1.0.0"));
+               sharedBuffer.remove("b", events[5], timestamp);
+
+               assertTrue(sharedBuffer.isEmpty());
+               assertTrue(patterns4.isEmpty());
+               assertEquals(Collections.singletonList(expectedPattern1), 
patterns1);
+               assertEquals(Collections.singletonList(expectedPattern2), 
patterns2);
+               assertEquals(Collections.singletonList(expectedPattern3), 
patterns3);
+       }
+
+       @Test
+       public void testSharedBufferSerialization() throws IOException, 
ClassNotFoundException {
+               SharedBuffer<String, Event> sharedBuffer = new 
SharedBuffer<>(Event.createTypeSerializer());
+               int numberEvents = 8;
+               Event[] events = new Event[numberEvents];
+               final long timestamp = 1L;
+
+               for (int i = 0; i < numberEvents; i++) {
+                       events[i] = new Event(i + 1, "e" + (i + 1), i);
+               }
+
+               sharedBuffer.put("a1", events[0], timestamp, null, null, 0, 
DeweyNumber.fromString("1"));
+               sharedBuffer.put("a[]", events[1], timestamp, "a1", events[0], 
timestamp, DeweyNumber.fromString("1.0"));
+               sharedBuffer.put("a1", events[2], timestamp, null, null, 0, 
DeweyNumber.fromString("2"));
+               sharedBuffer.put("a[]", events[2], timestamp, "a[]", events[1], 
timestamp, DeweyNumber.fromString("1.0"));
+               sharedBuffer.put("a[]", events[3], timestamp, "a[]", events[2], 
timestamp, DeweyNumber.fromString("1.0"));
+               sharedBuffer.put("a[]", events[3], timestamp, "a1", events[2], 
timestamp, DeweyNumber.fromString("2.0"));
+               sharedBuffer.put("a[]", events[4], timestamp, "a[]", events[3], 
timestamp, DeweyNumber.fromString("1.0"));
+               sharedBuffer.put("a[]", events[5], timestamp, "a[]", events[4], 
timestamp, DeweyNumber.fromString("1.1"));
+               sharedBuffer.put("b", events[5], timestamp, "a[]", events[3], 
timestamp, DeweyNumber.fromString("2.0.0"));
+               sharedBuffer.put("b", events[5], timestamp, "a[]", events[4], 
timestamp, DeweyNumber.fromString("1.0.0"));
+               sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], 
timestamp, DeweyNumber.fromString("1.1"));
+               sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], 
timestamp, DeweyNumber.fromString("1.1.0"));
+
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               ObjectOutputStream oos = new ObjectOutputStream(baos);
+
+               oos.writeObject(sharedBuffer);
+
+               ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
+               ObjectInputStream ois = new ObjectInputStream(bais);
+
+               SharedBuffer<String, Event> copy = (SharedBuffer<String, 
Event>)ois.readObject();
+
+               assertEquals(sharedBuffer, copy);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
new file mode 100644
index 0000000..eb3ead1
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.compiler;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.State;
+import org.apache.flink.cep.nfa.StateTransition;
+import org.apache.flink.cep.nfa.StateTransitionAction;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class NFACompilerTest extends TestLogger {
+
+       /**
+        * Tests that the NFACompiler generates the correct NFA from a given 
Pattern
+        */
+       @Test
+       public void testNFACompilerWithSimplePattern() {
+               Pattern<Event, Event> pattern = 
Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
3314714776170474221L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getPrice() > 2;
+                       }
+               })
+               .followedBy("middle").subtype(SubEvent.class)
+               .next("end").where(new FilterFunction<Event>() {
+                               private static final long serialVersionUID = 
3990995859716364087L;
+
+                               @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("end");
+                       }
+               });
+
+               TypeInformation<Event> typeInformation = 
(TypeInformation<Event>) TypeExtractor.createTypeInfo(Event.class);
+
+               NFA<Event> nfa = NFACompiler.<Event>compile(pattern, 
typeInformation.createSerializer(new ExecutionConfig()));
+
+               Set<State<Event>> states = nfa.getStates();
+
+               assertEquals(4, states.size());
+
+               Map<String, State<Event>> stateMap = new HashMap<>();
+
+               for (State<Event> state: states) {
+                       stateMap.put(state.getName(), state);
+               }
+
+               
assertTrue(stateMap.containsKey(NFACompiler.BEGINNING_STATE_NAME));
+               State<Event> beginningState = 
stateMap.get(NFACompiler.BEGINNING_STATE_NAME);
+
+               assertTrue(beginningState.isStart());
+
+               assertTrue(stateMap.containsKey("start"));
+               State<Event> startState = stateMap.get("start");
+
+               Collection<StateTransition<Event>> startTransitions = 
startState.getStateTransitions();
+               Map<String, StateTransition<Event>> startTransitionMap = new 
HashMap<>();
+
+               for (StateTransition<Event> transition: startTransitions) {
+                       
startTransitionMap.put(transition.getTargetState().getName(), transition);
+               }
+
+               assertEquals(2, startTransitionMap.size());
+               assertTrue(startTransitionMap.containsKey("start"));
+
+               StateTransition<Event> reflexiveTransition = 
startTransitionMap.get("start");
+               assertEquals(StateTransitionAction.IGNORE, 
reflexiveTransition.getAction());
+
+               assertTrue(startTransitionMap.containsKey("middle"));
+               StateTransition<Event> startMiddleTransition = 
startTransitionMap.get("middle");
+               assertEquals(StateTransitionAction.TAKE, 
startMiddleTransition.getAction());
+
+               assertTrue(stateMap.containsKey("middle"));
+               State<Event> middleState = stateMap.get("middle");
+
+               Map<String, StateTransition<Event>> middleTransitionMap = new 
HashMap<>();
+
+               for (StateTransition<Event> transition: 
middleState.getStateTransitions()) {
+                       
middleTransitionMap.put(transition.getTargetState().getName(), transition);
+               }
+
+               assertEquals(1, middleTransitionMap.size());
+
+               assertTrue(middleTransitionMap.containsKey("end"));
+               StateTransition<Event> middleEndTransition = 
middleTransitionMap.get("end");
+
+               assertEquals(StateTransitionAction.TAKE, 
middleEndTransition.getAction());
+
+               assertTrue(stateMap.containsKey("end"));
+               State<Event> endState = stateMap.get("end");
+
+               assertTrue(endState.isFinal());
+               assertEquals(0, endState.getStateTransitions().size());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
new file mode 100644
index 0000000..2edf005
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.SubEvent;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class PatternTest extends TestLogger {
+       /**
+        * These test simply test that the pattern construction completes 
without failure
+        */
+
+       @Test
+       public void testStrictContiguity() {
+               Pattern<Object, ?> pattern = 
Pattern.begin("start").next("next").next("end");
+               Pattern<Object, ?> previous;
+               Pattern<Object, ?> previous2;
+
+               assertNotNull(previous = pattern.getPrevious());
+               assertNotNull(previous2 = previous.getPrevious());
+               assertNull(previous2.getPrevious());
+
+               assertEquals(pattern.getName(), "end");
+               assertEquals(previous.getName(), "next");
+               assertEquals(previous2.getName(), "start");
+       }
+
+       @Test
+       public void testNonStrictContiguity() {
+               Pattern<Object, ?> pattern = 
Pattern.begin("start").followedBy("next").followedBy("end");
+               Pattern<Object, ?> previous;
+               Pattern<Object, ?> previous2;
+
+               assertNotNull(previous = pattern.getPrevious());
+               assertNotNull(previous2 = previous.getPrevious());
+               assertNull(previous2.getPrevious());
+
+               assertTrue(pattern instanceof FollowedByPattern);
+               assertTrue(previous instanceof FollowedByPattern);
+
+               assertEquals(pattern.getName(), "end");
+               assertEquals(previous.getName(), "next");
+               assertEquals(previous2.getName(), "start");
+       }
+
+       @Test
+       public void testStrictContiguityWithCondition() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").next("next").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
-7657256242101104925L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("foobar");
+                       }
+               }).next("end").where(new FilterFunction<Event>() {
+                       private static final long serialVersionUID = 
-7597452389191504189L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getId() == 42;
+                       }
+               });
+
+               Pattern<Event, ?> previous;
+               Pattern<Event, ?> previous2;
+
+               assertNotNull(previous = pattern.getPrevious());
+               assertNotNull(previous2 = previous.getPrevious());
+               assertNull(previous2.getPrevious());
+
+               assertNotNull(pattern.getFilterFunction());
+               assertNotNull(previous.getFilterFunction());
+               assertNull(previous2.getFilterFunction());
+
+               assertEquals(pattern.getName(), "end");
+               assertEquals(previous.getName(), "next");
+               assertEquals(previous2.getName(), "start");
+       }
+
+       @Test
+       public void testPatternWithSubtyping() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").next("subevent").subtype(SubEvent.class).followedBy("end");
+
+               Pattern<Event, ?> previous;
+               Pattern<Event, ?> previous2;
+
+               assertNotNull(previous = pattern.getPrevious());
+               assertNotNull(previous2 = previous.getPrevious());
+               assertNull(previous2.getPrevious());
+
+               assertNotNull(previous.getFilterFunction());
+               assertTrue(previous.getFilterFunction() instanceof 
SubtypeFilterFunction);
+
+               assertEquals(pattern.getName(), "end");
+               assertEquals(previous.getName(), "subevent");
+               assertEquals(previous2.getName(), "start");
+       }
+
+       @Test
+       public void testPatternWithSubtypingAndFilter() {
+               Pattern<Event, Event> pattern = 
Pattern.<Event>begin("start").next("subevent").subtype(SubEvent.class).where(new
 FilterFunction<SubEvent>() {
+                       private static final long serialVersionUID = 
-4118591291880230304L;
+
+                       @Override
+                       public boolean filter(SubEvent value) throws Exception {
+                               return false;
+                       }
+               }).followedBy("end");
+
+               Pattern<Event, ?> previous;
+               Pattern<Event, ?> previous2;
+
+               assertNotNull(previous = pattern.getPrevious());
+               assertNotNull(previous2 = previous.getPrevious());
+               assertNull(previous2.getPrevious());
+
+               assertTrue(pattern instanceof FollowedByPattern);
+               assertNotNull(previous.getFilterFunction());
+
+               assertEquals(pattern.getName(), "end");
+               assertEquals(previous.getName(), "subevent");
+               assertEquals(previous2.getName(), "start");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/pom.xml b/flink-libraries/pom.xml
index e0fbd49..34153c9 100644
--- a/flink-libraries/pom.xml
+++ b/flink-libraries/pom.xml
@@ -39,5 +39,6 @@ under the License.
                <module>flink-python</module>
                <module>flink-table</module>
                <module>flink-ml</module>
+               <module>flink-cep</module>
        </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 1ef3298..96ddda1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -105,7 +105,7 @@ public interface StreamOperator<OUT> extends Serializable {
        /**
         * Restores the operator state, if this operator's execution is 
recovering from a checkpoint.
         * This method restores the operator state (if the operator is 
stateful) and the key/value state
-        * (if it had been used and was initialized when the snapshot ocurred).
+        * (if it had been used and was initialized when the snapshot occurred).
         *
         * <p>This method is called after {@link #setup(StreamTask, 
StreamConfig, Output)}
         * and before {@link #open()}.

Reply via email to