[FLINK-6265] [cep] Fix consecutive() for times()

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ffca0e76
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ffca0e76
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ffca0e76

Branch: refs/heads/table-retraction
Commit: ffca0e76eaaefe6251a7664b93c4ec007e4305e9
Parents: 1649f35
Author: kl0u <kklou...@gmail.com>
Authored: Tue Apr 4 18:23:50 2017 +0200
Committer: kl0u <kklou...@gmail.com>
Committed: Wed Apr 5 17:20:01 2017 +0200

----------------------------------------------------------------------
 .../flink/cep/nfa/compiler/NFACompiler.java     |   1 -
 .../org/apache/flink/cep/nfa/NFAITCase.java     | 136 ++++++++++++++++++-
 2 files changed, 133 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ffca0e76/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index e441c4b..e8077fa 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -266,7 +266,6 @@ public class NFACompiler {
                        for (int i = 0; i < times - 1; i++) {
                                lastSink = createSingletonState(
                                        lastSink,
-                                       currentPattern instanceof 
FollowedByPattern &&
                                        
!currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT));
                        }
                        return createSingletonState(lastSink, currentPattern 
instanceof FollowedByPattern);

http://git-wip-us.apache.org/repos/asf/flink/blob/ffca0e76/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index da5f413..7359ca8 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -1336,9 +1336,10 @@ public class NFAITCase extends TestLogger {
                        }
                }
 
-               assertEquals(1, allPatterns.size());
+               assertEquals(2, allPatterns.size());
                assertEquals(Sets.<Set<Event>>newHashSet(
-                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
end1)
+                       Sets.newHashSet(startEvent, middleEvent1, middleEvent2, 
end1),
+                       Sets.newHashSet(startEvent, middleEvent1, middleEvent3, 
end1)
                ), resultingPatterns);
        }
 
@@ -1363,7 +1364,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
-               }).times(2).followedBy("end1").where(new 
SimpleCondition<Event>() {
+               }).times(2).consecutive().followedBy("end1").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
 
                        @Override
@@ -1396,6 +1397,135 @@ public class NFAITCase extends TestLogger {
        }
 
        @Test
+       public void testTimesNonStrictWithNext() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent1, 2));
+               inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+               inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 5));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).next("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).times(2).followedBy("end1").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                               Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.end),
+                               Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end)
+               ));
+       }
+
+       @Test
+       public void testTimesStrictWithFollowedBy() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent1, 2));
+               inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).times(2).consecutive().followedBy("end1").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent3, ConsecutiveData.middleEvent2, ConsecutiveData.end)
+               ));
+       }
+
+       @Test
+       public void testTimesStrictWithNextAndConsecutive() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent1, 2));
+               inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).next("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).times(2).consecutive().followedBy("end1").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, 
Lists.<List<Event>>newArrayList());
+       }
+
+       @Test
        public void testStartWithOptional() {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 

Reply via email to