Repository: flink Updated Branches: refs/heads/master 682d8d5e2 -> 79058edb6
http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java new file mode 100644 index 0000000..7dcda4c --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.TimestampExtractor; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.Map; + +public class CEPITCase extends StreamingMultipleProgramsTestBase { + + private String resultPath; + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + expected = ""; + } + + @After + public void after() throws Exception { + compareResultsByLinesInMemory(expected, resultPath); + } + + /** + * Checks that a certain event sequence is recognized + * @throws Exception + */ + @Test + public void testSimplePatternCEP() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Event> input = env.fromElements( + new Event(1, "barfoo", 1.0), + new Event(2, "start", 2.0), + new Event(3, "foobar", 3.0), + new SubEvent(4, "foo", 4.0, 1.0), + new Event(5, "middle", 5.0), + new SubEvent(6, "middle", 6.0, 2.0), + new SubEvent(7, "bar", 3.0, 3.0), + new Event(42, "42", 42.0), + new Event(8, "end", 1.0) + ); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5681493970790509488L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }) + .followedBy("middle").subtype(SubEvent.class).where( + new FilterFunction<SubEvent>() { + private static final long serialVersionUID = 448591738315698540L; + + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getName().equals("middle"); + } + } + ) + .followedBy("end").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 6080276591060431966L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); + + DataStream<String> result = CEP.pattern(input, pattern).select(new PatternSelectFunction<Event, String>() { + private static final long serialVersionUID = 1447462674590806097L; + + @Override + public String select(Map<String, Event> pattern) { + StringBuilder builder = new StringBuilder(); + + builder.append(pattern.get("start").getId()).append(",") + .append(pattern.get("middle").getId()).append(",") + .append(pattern.get("end").getId()); + + return builder.toString(); + } + }); + + result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + // expected sequence of matching event ids + expected = "2,6,8"; + + env.execute(); + } + + @Test + public void testSimpleKeyedPatternCEP() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + DataStream<Event> input = env.fromElements( + new Event(1, "barfoo", 1.0), + new Event(2, "start", 2.0), + new Event(3, "start", 2.1), + new Event(3, "foobar", 3.0), + new SubEvent(4, "foo", 4.0, 1.0), + new SubEvent(3, "middle", 3.2, 1.0), + new Event(42, "start", 3.1), + new SubEvent(42, "middle", 3.3, 1.2), + new Event(5, "middle", 5.0), + new SubEvent(2, "middle", 6.0, 2.0), + new SubEvent(7, "bar", 3.0, 3.0), + new Event(42, "42", 42.0), + new Event(3, "end", 2.0), + new Event(2, "end", 1.0), + new Event(42, "end", 42.0) + ).keyBy(new KeySelector<Event, Integer>() { + private static final long serialVersionUID = -2112041392652797483L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5681493970790509488L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }) + .followedBy("middle").subtype(SubEvent.class).where( + new FilterFunction<SubEvent>() { + private static final long serialVersionUID = 448591738315698540L; + + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getName().equals("middle"); + } + } + ) + .followedBy("end").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 6080276591060431966L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); + + DataStream<String> result = CEP.pattern(input, pattern).select(new PatternSelectFunction<Event, String>() { + private static final long serialVersionUID = 1447462674590806097L; + + @Override + public String select(Map<String, Event> pattern) { + StringBuilder builder = new StringBuilder(); + + builder.append(pattern.get("start").getId()).append(",") + .append(pattern.get("middle").getId()).append(",") + .append(pattern.get("end").getId()); + + return builder.toString(); + } + }); + + result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + // the expected sequences of matching event ids + expected = "2,2,2\n3,3,3\n42,42,42"; + + env.execute(); + } + + @Test + public void testSimplePatternEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + // (Event, timestamp) + DataStream<Event> input = env.fromElements( + Tuple2.of(new Event(1, "start", 1.0), 5L), + Tuple2.of(new Event(2, "middle", 2.0), 1L), + Tuple2.of(new Event(3, "end", 3.0), 3L), + Tuple2.of(new Event(4, "end", 4.0), 10L), + Tuple2.of(new Event(5, "middle", 5.0), 7L) + ).assignTimestamps(new TimestampExtractor<Tuple2<Event, Long>>() { + private static final long serialVersionUID = 878281782188702293L; + + private Long currentMaxTimestamp = Long.MIN_VALUE; + + @Override + public long extractTimestamp(Tuple2<Event, Long> element, long currentTimestamp) { + if (currentMaxTimestamp < element.f1) { + currentMaxTimestamp = element.f1; + } + + return element.f1; + } + + @Override + public long extractWatermark(Tuple2<Event, Long> element, long currentTimestamp) { + return currentMaxTimestamp - 5; + } + + @Override + public long getCurrentWatermark() { + return Long.MIN_VALUE; + } + }).map(new MapFunction<Tuple2<Event, Long>, Event>() { + private static final long serialVersionUID = -5288731103938665328L; + + @Override + public Event map(Tuple2<Event, Long> value) throws Exception { + return value.f0; + } + }); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 2601494641888389648L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }).followedBy("middle").where(new FilterFunction<Event>() { + private static final long serialVersionUID = -3133506934766766660L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("middle"); + } + }).followedBy("end").where(new FilterFunction<Event>() { + private static final long serialVersionUID = -8528031731858936269L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); + + DataStream<String> result = CEP.pattern(input, pattern).select( + new PatternSelectFunction<Event, String>() { + private static final long serialVersionUID = 1447462674590806097L; + + @Override + public String select(Map<String, Event> pattern) { + StringBuilder builder = new StringBuilder(); + + builder.append(pattern.get("start").getId()).append(",") + .append(pattern.get("middle").getId()).append(",") + .append(pattern.get("end").getId()); + + return builder.toString(); + } + } + ); + + result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + // the expected sequence of matching event ids + expected = "1,5,4"; + + env.execute(); + } + + @Test + public void testSimpleKeyedPatternEventTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setParallelism(2); + + // (Event, timestamp) + DataStream<Event> input = env.fromElements( + Tuple2.of(new Event(1, "start", 1.0), 5L), + Tuple2.of(new Event(1, "middle", 2.0), 1L), + Tuple2.of(new Event(2, "middle", 2.0), 4L), + Tuple2.of(new Event(2, "start", 2.0), 3L), + Tuple2.of(new Event(1, "end", 3.0), 3L), + Tuple2.of(new Event(3, "start", 4.1), 5L), + Tuple2.of(new Event(1, "end", 4.0), 10L), + Tuple2.of(new Event(2, "end", 2.0), 8L), + Tuple2.of(new Event(1, "middle", 5.0), 7L), + Tuple2.of(new Event(3, "middle", 6.0), 9L), + Tuple2.of(new Event(3, "end", 7.0), 7L) + ).assignTimestamps(new TimestampExtractor<Tuple2<Event, Long>>() { + private static final long serialVersionUID = 878281782188702293L; + + private Long currentMaxTimestamp = Long.MIN_VALUE; + + @Override + public long extractTimestamp(Tuple2<Event, Long> element, long currentTimestamp) { + if (currentMaxTimestamp < element.f1) { + currentMaxTimestamp = element.f1; + } + + return element.f1; + } + + @Override + public long extractWatermark(Tuple2<Event, Long> element, long currentTimestamp) { + return currentMaxTimestamp - 5; + } + + @Override + public long getCurrentWatermark() { + return Long.MIN_VALUE; + } + }).map(new MapFunction<Tuple2<Event, Long>, Event>() { + private static final long serialVersionUID = -5288731103938665328L; + + @Override + public Event map(Tuple2<Event, Long> value) throws Exception { + return value.f0; + } + }).keyBy(new KeySelector<Event, Integer>() { + private static final long serialVersionUID = -3282946957177720879L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 2601494641888389648L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }).followedBy("middle").where(new FilterFunction<Event>() { + private static final long serialVersionUID = -3133506934766766660L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("middle"); + } + }).followedBy("end").where(new FilterFunction<Event>() { + private static final long serialVersionUID = -8528031731858936269L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); + + DataStream<String> result = CEP.pattern(input, pattern).select( + new PatternSelectFunction<Event, String>() { + private static final long serialVersionUID = 1447462674590806097L; + + @Override + public String select(Map<String, Event> pattern) { + StringBuilder builder = new StringBuilder(); + + builder.append(pattern.get("start").getId()).append(",") + .append(pattern.get("middle").getId()).append(",") + .append(pattern.get("end").getId()); + + return builder.toString(); + } + } + ); + + result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + // the expected sequences of matching event ids + expected = "1,1,1\n2,2,2"; + + env.execute(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java new file mode 100644 index 0000000..efe56b7 --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +import java.util.Objects; + +public class Event { + private String name; + private double price; + private int id; + + public Event(int id, String name, double price) { + this.id = id; + this.name = name; + this.price = price; + } + + public double getPrice() { + return price; + } + + public int getId() { + return id; + } + + public String getName() { + return name; + } + + @Override + public String toString() { + return "Event(" + id + ", " + name + ", " + price + ")"; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof Event) { + Event other = (Event) obj; + + return name.equals(other.name) && price == other.price && id == other.id; + } else { + return false; + } + } + + @Override + public int hashCode() { + return Objects.hash(name, price, id); + } + + public static TypeSerializer<Event> createTypeSerializer() { + TypeInformation<Event> typeInformation = (TypeInformation<Event>) TypeExtractor.createTypeInfo(Event.class); + + return typeInformation.createSerializer(new ExecutionConfig()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/StreamEvent.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/StreamEvent.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/StreamEvent.java new file mode 100644 index 0000000..adb9446 --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/StreamEvent.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +public class StreamEvent<T> { + private final T event; + private final long timestamp; + + public StreamEvent(T event, long timestamp) { + this.event = event; + this.timestamp = timestamp; + } + + public long getTimestamp() { + return timestamp; + } + + public T getEvent() { + return event; + } + + public static <V> StreamEvent<V> of(V event, long timestamp) { + return new StreamEvent<>(event, timestamp); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java new file mode 100644 index 0000000..31eff28 --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +public class SubEvent extends Event { + private final double volume; + + public SubEvent(int id, String name, double price, double volume) { + super(id, name, price); + this.volume = volume; + } + + public double getVolume() { + return volume; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + + builder.append("SubEvent(") + .append(getId()) + .append(", ") + .append(getName()) + .append(", ") + .append(getPrice()) + .append(", ") + .append(getVolume()) + .append(")"); + + return builder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java new file mode 100644 index 0000000..8bc010a --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class DeweyNumberTest extends TestLogger { + + @Test + public void testDeweyNumberGeneration() { + DeweyNumber start = new DeweyNumber(1); + DeweyNumber increased = start.increase(); + DeweyNumber increaseAddStage = increased.addStage(); + DeweyNumber startAddStage = start.addStage(); + DeweyNumber startAddStageIncreased = startAddStage.increase(); + DeweyNumber startAddStageIncreasedAddStage = startAddStageIncreased.addStage(); + + assertEquals(DeweyNumber.fromString("1"), start); + assertEquals(DeweyNumber.fromString("2"), increased); + assertEquals(DeweyNumber.fromString("2.0"), increaseAddStage); + assertEquals(DeweyNumber.fromString("1.0"), startAddStage); + assertEquals(DeweyNumber.fromString("1.1"), startAddStageIncreased); + assertEquals(DeweyNumber.fromString("1.1.0"), startAddStageIncreasedAddStage); + + assertTrue(startAddStage.isCompatibleWith(start)); + assertTrue(startAddStageIncreased.isCompatibleWith(startAddStage)); + assertTrue(startAddStageIncreasedAddStage.isCompatibleWith(startAddStageIncreased)); + assertFalse(startAddStageIncreasedAddStage.isCompatibleWith(startAddStage)); + assertFalse(increaseAddStage.isCompatibleWith(startAddStage)); + assertFalse(startAddStage.isCompatibleWith(increaseAddStage)); + assertFalse(startAddStageIncreased.isCompatibleWith(startAddStageIncreasedAddStage)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java new file mode 100644 index 0000000..a46a81e --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.cep.Event; +import org.apache.flink.cep.StreamEvent; +import org.apache.flink.cep.SubEvent; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class NFAITCase extends TestLogger { + + @Test + public void testSimplePatternNFA() { + List<StreamEvent<Event>> inputEvents = new ArrayList<StreamEvent<Event>>(); + + Event startEvent = new Event(42, "start", 1.0); + SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); + Event endEvent= new Event(43, "end", 1.0); + + inputEvents.add(new StreamEvent<Event>(startEvent, 1)); + inputEvents.add(new StreamEvent<Event>(new Event(43, "foobar", 1.0), 2)); + inputEvents.add(new StreamEvent<Event>(new SubEvent(41, "barfoo", 1.0, 5.0), 3)); + inputEvents.add(new StreamEvent<Event>(middleEvent, 3)); + inputEvents.add(new StreamEvent<Event>(new Event(43, "start", 1.0), 4)); + inputEvents.add(new StreamEvent<Event>(endEvent, 5)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }) + .followedBy("middle").subtype(SubEvent.class).where(new FilterFunction<SubEvent>() { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getVolume() > 5.0; + } + }) + .followedBy("end").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer()); + List<Map<String, Event>> resultingPatterns = new ArrayList<>(); + + for (StreamEvent<Event> inputEvent: inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getEvent(), + inputEvent.getTimestamp()); + + resultingPatterns.addAll(patterns); + } + + assertEquals(1, resultingPatterns.size()); + Map<String, Event> patternMap = resultingPatterns.get(0); + + assertEquals(startEvent, patternMap.get("start")); + assertEquals(middleEvent, patternMap.get("middle")); + assertEquals(endEvent, patternMap.get("end")); + } + + /** + * Tests that the NFA successfully filters out expired elements with respect to the window + * length + */ + @Test + public void testSimplePatternWithTimeWindowNFA() { + List<StreamEvent<Event>> events = new ArrayList<>(); + List<Map<String, Event>> resultingPatterns = new ArrayList<>(); + + final Event startEvent; + final Event middleEvent; + final Event endEvent; + + events.add(new StreamEvent<Event>(new Event(1, "start", 1.0), 1)); + events.add(new StreamEvent<Event>(startEvent = new Event(2, "start", 1.0), 2)); + events.add(new StreamEvent<Event>(middleEvent = new Event(3, "middle", 1.0), 3)); + events.add(new StreamEvent<Event>(new Event(4, "foobar", 1.0), 4)); + events.add(new StreamEvent<Event>(endEvent = new Event(5, "end", 1.0), 12)); + events.add(new StreamEvent<Event>(new Event(6, "end", 1.0), 13)); + + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 7907391379273505897L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }).followedBy("middle").where(new FilterFunction<Event>() { + private static final long serialVersionUID = -3268741540234334074L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("middle"); + } + }).followedBy("end").where(new FilterFunction<Event>() { + private static final long serialVersionUID = -8995174172182138608L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }).within(Time.milliseconds(10)); + + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer()); + + for (StreamEvent<Event> event: events) { + Collection<Map<String, Event>> patterns = nfa.process(event.getEvent(), event.getTimestamp()); + + resultingPatterns.addAll(patterns); + } + + assertEquals(1, resultingPatterns.size()); + + Map<String, Event> patternMap = resultingPatterns.get(0); + + assertEquals(startEvent, patternMap.get("start")); + assertEquals(middleEvent, patternMap.get("middle")); + assertEquals(endEvent, patternMap.get("end")); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java new file mode 100644 index 0000000..3face76 --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.cep.Event; +import org.apache.flink.cep.StreamEvent; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +public class NFATest extends TestLogger { + @Test + public void testSimpleNFA() { + NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 0); + List<StreamEvent<Event>> streamEvents = new ArrayList<>(); + + streamEvents.add(StreamEvent.of(new Event(1, "start", 1.0), 1L)); + streamEvents.add(StreamEvent.of(new Event(2, "bar", 2.0), 2L)); + streamEvents.add(StreamEvent.of(new Event(3, "start", 3.0), 3L)); + streamEvents.add(StreamEvent.of(new Event(4, "end", 4.0), 4L)); + + State<Event> startingState = new State<>("", State.StateType.Start); + State<Event> startState = new State<>("start", State.StateType.Normal); + State<Event> endState = new State<>("end", State.StateType.Final); + StateTransition<Event> starting2Start = new StateTransition<>( + StateTransitionAction.TAKE, + startState, + new FilterFunction<Event>() { + private static final long serialVersionUID = -4869589195918650396L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + } + ); + + StateTransition<Event> start2End = new StateTransition<>( + StateTransitionAction.TAKE, + endState, + new FilterFunction<Event>() { + private static final long serialVersionUID = 2979804163709590673L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + } + ); + + StateTransition<Event> start2Start = new StateTransition<>(StateTransitionAction.IGNORE, startState, null); + + startingState.addStateTransition(starting2Start); + startState.addStateTransition(start2End); + startState.addStateTransition(start2Start); + + nfa.addState(startingState); + nfa.addState(startState); + nfa.addState(endState); + + Set<Map<String, Event>> expectedPatterns = new HashSet<>(); + + Map<String, Event> firstPattern = new HashMap<>(); + firstPattern.put("start", new Event(1, "start", 1.0)); + firstPattern.put("end", new Event(4, "end", 4.0)); + + Map<String, Event> secondPattern = new HashMap<>(); + secondPattern.put("start", new Event(3, "start", 3.0)); + secondPattern.put("end", new Event(4, "end", 4.0)); + + expectedPatterns.add(firstPattern); + expectedPatterns.add(secondPattern); + + Collection<Map<String, Event>> actualPatterns = runNFA(nfa, streamEvents); + + assertEquals(expectedPatterns, actualPatterns); + } + + @Test + public void testTimeoutWindowPruning() { + NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 2); + List<StreamEvent<Event>> streamEvents = new ArrayList<>(); + + streamEvents.add(StreamEvent.of(new Event(1, "start", 1.0), 1L)); + streamEvents.add(StreamEvent.of(new Event(2, "bar", 2.0), 2L)); + streamEvents.add(StreamEvent.of(new Event(3, "start", 3.0), 3L)); + streamEvents.add(StreamEvent.of(new Event(4, "end", 4.0), 4L)); + + State<Event> startingState = new State<>("", State.StateType.Start); + State<Event> startState = new State<>("start", State.StateType.Normal); + State<Event> endState = new State<>("end", State.StateType.Final); + StateTransition<Event> starting2Start = new StateTransition<>( + StateTransitionAction.TAKE, + startState, + new FilterFunction<Event>() { + private static final long serialVersionUID = -4869589195918650396L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }); + + StateTransition<Event> start2End = new StateTransition<>( + StateTransitionAction.TAKE, + endState, + new FilterFunction<Event>() { + private static final long serialVersionUID = 2979804163709590673L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); + + StateTransition<Event> start2Start = new StateTransition<>( + StateTransitionAction.IGNORE, + startState, + null); + + startingState.addStateTransition(starting2Start); + startState.addStateTransition(start2End); + startState.addStateTransition(start2Start); + + nfa.addState(startingState); + nfa.addState(startState); + nfa.addState(endState); + + Set<Map<String, Event>> expectedPatterns = new HashSet<>(); + + Map<String, Event> secondPattern = new HashMap<>(); + secondPattern.put("start", new Event(3, "start", 3.0)); + secondPattern.put("end", new Event(4, "end", 4.0)); + + expectedPatterns.add(secondPattern); + + Collection<Map<String, Event>> actualPatterns = runNFA(nfa, streamEvents); + + assertEquals(expectedPatterns, actualPatterns); + } + + @Test + public void testStateNameGeneration() { + String expectedName1 = "a[2]"; + String expectedName2 = "a_3"; + String expectedName3 = "a[][42]"; + + String generatedName1 = NFA.generateStateName("a[]", 2); + String generatedName2 = NFA.generateStateName("a", 3); + String generatedName3 = NFA.generateStateName("a[][]", 42); + + + assertEquals(expectedName1, generatedName1); + assertEquals(expectedName2, generatedName2); + assertEquals(expectedName3, generatedName3); + } + + public <T> Collection<Map<String, T>> runNFA(NFA<T> nfa, List<StreamEvent<T>> inputs) { + Set<Map<String, T>> actualPatterns = new HashSet<>(); + + for (StreamEvent<T> streamEvent: inputs) { + Collection<Map<String, T>> matchedPatterns = nfa.process(streamEvent.getEvent(), streamEvent.getTimestamp()); + + actualPatterns.addAll(matchedPatterns); + } + + return actualPatterns; + } + + @Test + public void testNFASerialization() throws IOException, ClassNotFoundException { + NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 0); + + State<Event> startingState = new State<>("", State.StateType.Start); + State<Event> startState = new State<>("start", State.StateType.Normal); + State<Event> endState = new State<>("end", State.StateType.Final); + + StateTransition<Event> starting2Start = new StateTransition<>( + StateTransitionAction.TAKE, + startState, + new NameFilter("start")); + + StateTransition<Event> start2End = new StateTransition<>( + StateTransitionAction.TAKE, + endState, + new NameFilter("end")); + + StateTransition<Event> start2Start = new StateTransition<>( + StateTransitionAction.IGNORE, + startState, + null); + + startingState.addStateTransition(starting2Start); + startState.addStateTransition(start2End); + startState.addStateTransition(start2Start); + + nfa.addState(startingState); + nfa.addState(startState); + nfa.addState(endState); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + + oos.writeObject(nfa); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + ObjectInputStream ois = new ObjectInputStream(bais); + + @SuppressWarnings("unchecked") + NFA<Event> copy = (NFA<Event>) ois.readObject(); + + assertEquals(nfa, copy); + } + + private static class NameFilter implements FilterFunction<Event> { + + private static final long serialVersionUID = 7472112494752423802L; + + private final String name; + + public NameFilter(final String name) { + this.name = name; + } + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals(name); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java new file mode 100644 index 0000000..25618d5 --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import com.google.common.collect.LinkedHashMultimap; +import org.apache.flink.cep.Event; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Collection; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class SharedBufferTest extends TestLogger { + + @Test + public void testSharedBuffer() { + SharedBuffer<String, Event> sharedBuffer = new SharedBuffer<>(Event.createTypeSerializer()); + int numberEvents = 8; + Event[] events = new Event[numberEvents]; + final long timestamp = 1L; + + for (int i = 0; i < numberEvents; i++) { + events[i] = new Event(i + 1, "e" + (i + 1), i); + } + + LinkedHashMultimap<String, Event> expectedPattern1 = LinkedHashMultimap.create(); + expectedPattern1.put("a1", events[2]); + expectedPattern1.put("a[]", events[3]); + expectedPattern1.put("b", events[5]); + + LinkedHashMultimap<String, Event> expectedPattern2 = LinkedHashMultimap.create(); + expectedPattern2.put("a1", events[0]); + expectedPattern2.put("a[]", events[1]); + expectedPattern2.put("a[]", events[2]); + expectedPattern2.put("a[]", events[3]); + expectedPattern2.put("a[]", events[4]); + expectedPattern2.put("b", events[5]); + + LinkedHashMultimap<String, Event> expectedPattern3 = LinkedHashMultimap.create(); + expectedPattern3.put("a1", events[0]); + expectedPattern3.put("a[]", events[1]); + expectedPattern3.put("a[]", events[2]); + expectedPattern3.put("a[]", events[3]); + expectedPattern3.put("a[]", events[4]); + expectedPattern3.put("a[]", events[5]); + expectedPattern3.put("a[]", events[6]); + expectedPattern3.put("b", events[7]); + + sharedBuffer.put("a1", events[0], timestamp, null, null, 0, DeweyNumber.fromString("1")); + sharedBuffer.put("a[]", events[1], timestamp, "a1", events[0], timestamp, DeweyNumber.fromString("1.0")); + sharedBuffer.put("a1", events[2], timestamp, null, null, 0, DeweyNumber.fromString("2")); + sharedBuffer.put("a[]", events[2], timestamp, "a[]", events[1], timestamp, DeweyNumber.fromString("1.0")); + sharedBuffer.put("a[]", events[3], timestamp, "a[]", events[2], timestamp, DeweyNumber.fromString("1.0")); + sharedBuffer.put("a[]", events[3], timestamp, "a1", events[2], timestamp, DeweyNumber.fromString("2.0")); + sharedBuffer.put("a[]", events[4], timestamp, "a[]", events[3], timestamp, DeweyNumber.fromString("1.0")); + sharedBuffer.put("a[]", events[5], timestamp, "a[]", events[4], timestamp, DeweyNumber.fromString("1.1")); + sharedBuffer.put("b", events[5], timestamp, "a[]", events[3], timestamp, DeweyNumber.fromString("2.0.0")); + sharedBuffer.put("b", events[5], timestamp, "a[]", events[4], timestamp, DeweyNumber.fromString("1.0.0")); + sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], timestamp, DeweyNumber.fromString("1.1")); + sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], timestamp, DeweyNumber.fromString("1.1.0")); + + Collection<LinkedHashMultimap<String, Event>> patterns3 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0")); + sharedBuffer.remove("b", events[7], timestamp); + Collection<LinkedHashMultimap<String, Event>> patterns4 = sharedBuffer.extractPatterns("b", events[7], timestamp, DeweyNumber.fromString("1.1.0")); + Collection<LinkedHashMultimap<String, Event>> patterns1 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("2.0.0")); + Collection<LinkedHashMultimap<String, Event>> patterns2 = sharedBuffer.extractPatterns("b", events[5], timestamp, DeweyNumber.fromString("1.0.0")); + sharedBuffer.remove("b", events[5], timestamp); + + assertTrue(sharedBuffer.isEmpty()); + assertTrue(patterns4.isEmpty()); + assertEquals(Collections.singletonList(expectedPattern1), patterns1); + assertEquals(Collections.singletonList(expectedPattern2), patterns2); + assertEquals(Collections.singletonList(expectedPattern3), patterns3); + } + + @Test + public void testSharedBufferSerialization() throws IOException, ClassNotFoundException { + SharedBuffer<String, Event> sharedBuffer = new SharedBuffer<>(Event.createTypeSerializer()); + int numberEvents = 8; + Event[] events = new Event[numberEvents]; + final long timestamp = 1L; + + for (int i = 0; i < numberEvents; i++) { + events[i] = new Event(i + 1, "e" + (i + 1), i); + } + + sharedBuffer.put("a1", events[0], timestamp, null, null, 0, DeweyNumber.fromString("1")); + sharedBuffer.put("a[]", events[1], timestamp, "a1", events[0], timestamp, DeweyNumber.fromString("1.0")); + sharedBuffer.put("a1", events[2], timestamp, null, null, 0, DeweyNumber.fromString("2")); + sharedBuffer.put("a[]", events[2], timestamp, "a[]", events[1], timestamp, DeweyNumber.fromString("1.0")); + sharedBuffer.put("a[]", events[3], timestamp, "a[]", events[2], timestamp, DeweyNumber.fromString("1.0")); + sharedBuffer.put("a[]", events[3], timestamp, "a1", events[2], timestamp, DeweyNumber.fromString("2.0")); + sharedBuffer.put("a[]", events[4], timestamp, "a[]", events[3], timestamp, DeweyNumber.fromString("1.0")); + sharedBuffer.put("a[]", events[5], timestamp, "a[]", events[4], timestamp, DeweyNumber.fromString("1.1")); + sharedBuffer.put("b", events[5], timestamp, "a[]", events[3], timestamp, DeweyNumber.fromString("2.0.0")); + sharedBuffer.put("b", events[5], timestamp, "a[]", events[4], timestamp, DeweyNumber.fromString("1.0.0")); + sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], timestamp, DeweyNumber.fromString("1.1")); + sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], timestamp, DeweyNumber.fromString("1.1.0")); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + + oos.writeObject(sharedBuffer); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + ObjectInputStream ois = new ObjectInputStream(bais); + + SharedBuffer<String, Event> copy = (SharedBuffer<String, Event>)ois.readObject(); + + assertEquals(sharedBuffer, copy); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java new file mode 100644 index 0000000..eb3ead1 --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.compiler; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.cep.Event; +import org.apache.flink.cep.SubEvent; +import org.apache.flink.cep.nfa.NFA; +import org.apache.flink.cep.nfa.State; +import org.apache.flink.cep.nfa.StateTransition; +import org.apache.flink.cep.nfa.StateTransitionAction; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; + +public class NFACompilerTest extends TestLogger { + + /** + * Tests that the NFACompiler generates the correct NFA from a given Pattern + */ + @Test + public void testNFACompilerWithSimplePattern() { + Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 3314714776170474221L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getPrice() > 2; + } + }) + .followedBy("middle").subtype(SubEvent.class) + .next("end").where(new FilterFunction<Event>() { + private static final long serialVersionUID = 3990995859716364087L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); + + TypeInformation<Event> typeInformation = (TypeInformation<Event>) TypeExtractor.createTypeInfo(Event.class); + + NFA<Event> nfa = NFACompiler.<Event>compile(pattern, typeInformation.createSerializer(new ExecutionConfig())); + + Set<State<Event>> states = nfa.getStates(); + + assertEquals(4, states.size()); + + Map<String, State<Event>> stateMap = new HashMap<>(); + + for (State<Event> state: states) { + stateMap.put(state.getName(), state); + } + + assertTrue(stateMap.containsKey(NFACompiler.BEGINNING_STATE_NAME)); + State<Event> beginningState = stateMap.get(NFACompiler.BEGINNING_STATE_NAME); + + assertTrue(beginningState.isStart()); + + assertTrue(stateMap.containsKey("start")); + State<Event> startState = stateMap.get("start"); + + Collection<StateTransition<Event>> startTransitions = startState.getStateTransitions(); + Map<String, StateTransition<Event>> startTransitionMap = new HashMap<>(); + + for (StateTransition<Event> transition: startTransitions) { + startTransitionMap.put(transition.getTargetState().getName(), transition); + } + + assertEquals(2, startTransitionMap.size()); + assertTrue(startTransitionMap.containsKey("start")); + + StateTransition<Event> reflexiveTransition = startTransitionMap.get("start"); + assertEquals(StateTransitionAction.IGNORE, reflexiveTransition.getAction()); + + assertTrue(startTransitionMap.containsKey("middle")); + StateTransition<Event> startMiddleTransition = startTransitionMap.get("middle"); + assertEquals(StateTransitionAction.TAKE, startMiddleTransition.getAction()); + + assertTrue(stateMap.containsKey("middle")); + State<Event> middleState = stateMap.get("middle"); + + Map<String, StateTransition<Event>> middleTransitionMap = new HashMap<>(); + + for (StateTransition<Event> transition: middleState.getStateTransitions()) { + middleTransitionMap.put(transition.getTargetState().getName(), transition); + } + + assertEquals(1, middleTransitionMap.size()); + + assertTrue(middleTransitionMap.containsKey("end")); + StateTransition<Event> middleEndTransition = middleTransitionMap.get("end"); + + assertEquals(StateTransitionAction.TAKE, middleEndTransition.getAction()); + + assertTrue(stateMap.containsKey("end")); + State<Event> endState = stateMap.get("end"); + + assertTrue(endState.isFinal()); + assertEquals(0, endState.getStateTransitions().size()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java new file mode 100644 index 0000000..2edf005 --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.cep.Event; +import org.apache.flink.cep.SubEvent; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class PatternTest extends TestLogger { + /** + * These test simply test that the pattern construction completes without failure + */ + + @Test + public void testStrictContiguity() { + Pattern<Object, ?> pattern = Pattern.begin("start").next("next").next("end"); + Pattern<Object, ?> previous; + Pattern<Object, ?> previous2; + + assertNotNull(previous = pattern.getPrevious()); + assertNotNull(previous2 = previous.getPrevious()); + assertNull(previous2.getPrevious()); + + assertEquals(pattern.getName(), "end"); + assertEquals(previous.getName(), "next"); + assertEquals(previous2.getName(), "start"); + } + + @Test + public void testNonStrictContiguity() { + Pattern<Object, ?> pattern = Pattern.begin("start").followedBy("next").followedBy("end"); + Pattern<Object, ?> previous; + Pattern<Object, ?> previous2; + + assertNotNull(previous = pattern.getPrevious()); + assertNotNull(previous2 = previous.getPrevious()); + assertNull(previous2.getPrevious()); + + assertTrue(pattern instanceof FollowedByPattern); + assertTrue(previous instanceof FollowedByPattern); + + assertEquals(pattern.getName(), "end"); + assertEquals(previous.getName(), "next"); + assertEquals(previous2.getName(), "start"); + } + + @Test + public void testStrictContiguityWithCondition() { + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").next("next").where(new FilterFunction<Event>() { + private static final long serialVersionUID = -7657256242101104925L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("foobar"); + } + }).next("end").where(new FilterFunction<Event>() { + private static final long serialVersionUID = -7597452389191504189L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getId() == 42; + } + }); + + Pattern<Event, ?> previous; + Pattern<Event, ?> previous2; + + assertNotNull(previous = pattern.getPrevious()); + assertNotNull(previous2 = previous.getPrevious()); + assertNull(previous2.getPrevious()); + + assertNotNull(pattern.getFilterFunction()); + assertNotNull(previous.getFilterFunction()); + assertNull(previous2.getFilterFunction()); + + assertEquals(pattern.getName(), "end"); + assertEquals(previous.getName(), "next"); + assertEquals(previous2.getName(), "start"); + } + + @Test + public void testPatternWithSubtyping() { + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").next("subevent").subtype(SubEvent.class).followedBy("end"); + + Pattern<Event, ?> previous; + Pattern<Event, ?> previous2; + + assertNotNull(previous = pattern.getPrevious()); + assertNotNull(previous2 = previous.getPrevious()); + assertNull(previous2.getPrevious()); + + assertNotNull(previous.getFilterFunction()); + assertTrue(previous.getFilterFunction() instanceof SubtypeFilterFunction); + + assertEquals(pattern.getName(), "end"); + assertEquals(previous.getName(), "subevent"); + assertEquals(previous2.getName(), "start"); + } + + @Test + public void testPatternWithSubtypingAndFilter() { + Pattern<Event, Event> pattern = Pattern.<Event>begin("start").next("subevent").subtype(SubEvent.class).where(new FilterFunction<SubEvent>() { + private static final long serialVersionUID = -4118591291880230304L; + + @Override + public boolean filter(SubEvent value) throws Exception { + return false; + } + }).followedBy("end"); + + Pattern<Event, ?> previous; + Pattern<Event, ?> previous2; + + assertNotNull(previous = pattern.getPrevious()); + assertNotNull(previous2 = previous.getPrevious()); + assertNull(previous2.getPrevious()); + + assertTrue(pattern instanceof FollowedByPattern); + assertNotNull(previous.getFilterFunction()); + + assertEquals(pattern.getName(), "end"); + assertEquals(previous.getName(), "subevent"); + assertEquals(previous2.getName(), "start"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/pom.xml b/flink-libraries/pom.xml index e0fbd49..34153c9 100644 --- a/flink-libraries/pom.xml +++ b/flink-libraries/pom.xml @@ -39,5 +39,6 @@ under the License. <module>flink-python</module> <module>flink-table</module> <module>flink-ml</module> + <module>flink-cep</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java index 1ef3298..96ddda1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java @@ -105,7 +105,7 @@ public interface StreamOperator<OUT> extends Serializable { /** * Restores the operator state, if this operator's execution is recovering from a checkpoint. * This method restores the operator state (if the operator is stateful) and the key/value state - * (if it had been used and was initialized when the snapshot ocurred). + * (if it had been used and was initialized when the snapshot occurred). * * <p>This method is called after {@link #setup(StreamTask, StreamConfig, Output)} * and before {@link #open()}.