[ https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065948#comment-16065948 ]
ASF GitHub Bot commented on FLINK-7008: --------------------------------------- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4195#discussion_r124455303 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java --- @@ -324,6 +327,83 @@ public boolean filter(Event value) throws Exception { } } + @Test + public void testNFAChange() { + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 1858562682635302605L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).notFollowedBy("not").where(new IterativeCondition<Event>() { + private static final long serialVersionUID = -6085237016591726715L; + + @Override + public boolean filter(Event value, Context<Event> ctx) throws Exception { + return value.getName().equals("c"); + } + }).followedByAny("middle").where(new IterativeCondition<Event>() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value, Context<Event> ctx) throws Exception { + return value.getName().equals("b"); + } + }).oneOrMore().optional().allowCombinations().followedBy("middle2").where(new IterativeCondition<Event>() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value, Context<Event> ctx) throws Exception { + return value.getName().equals("d"); + } + }).followedBy("end").where(new IterativeCondition<Event>() { + private static final long serialVersionUID = 8061969839441121955L; + + @Override + public boolean filter(Event value, Context<Event> ctx) throws Exception { + return value.getName().equals("e"); + } + }).within(Time.milliseconds(10)); + + NFACompiler.NFAFactory<Event> nfaFactory = NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true); + NFA<Event> nfa = nfaFactory.createNFA(); + nfa.process(new Event(1, "b", 1.0), 1L); + assertFalse(nfa.isNFAChanged()); + + nfa.nfaChanged = false; --- End diff -- @dawidwys thanks a lot for the review. Have added more comments. > Update NFA state only when the NFA changes. > ------------------------------------------- > > Key: FLINK-7008 > URL: https://issues.apache.org/jira/browse/FLINK-7008 > Project: Flink > Issue Type: Improvement > Components: CEP > Affects Versions: 1.3.1 > Reporter: Kostas Kloudas > Assignee: Dian Fu > Fix For: 1.4.0 > > > Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we > update the NFA state every time the NFA is touched. This leads to redundant > puts/gets to the state when there are no changes to the NFA itself. -- This message was sent by Atlassian JIRA (v6.4.14#64029)