dianfu commented on code in PR #19295: URL: https://github.com/apache/flink/pull/19295#discussion_r895598233
########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java: ########## @@ -261,15 +261,31 @@ public Collection<Tuple2<Map<String, List<T>>, Long>> advanceTime( final NFAState nfaState, final long timestamp) throws Exception { + return advanceTimeAndHandlePendingState(sharedBufferAccessor, nfaState, timestamp).f1; + } + + public Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, List<T>>, Long>>> + advanceTimeAndHandlePendingState( Review Comment: There is no need to introduce advanceTimeAndHandlePendingState. We could just update the signature of advanceTime if needed. ########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java: ########## @@ -304,6 +305,18 @@ private State<T> createMiddleStates(final State<T> sinkState) { if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) { // skip notFollow patterns, they are converted into edge conditions + if (currentPattern.getWindowTime() != null Review Comment: should use `windowTime` instead of `currentPattern.getWindowTime()`. This also means that we need to calculate `windowTime` in advance. ########## flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java: ########## @@ -630,6 +630,102 @@ public String select(Map<String, List<Event>> pattern) { assertEquals(expected, resultList); } + @Test + public void testNotFollowedByWithIn() throws Exception { Review Comment: Could we support the following pattern? If so, it would be great to add an ITCase. ``` Pattern.begin('A').notFollowedBy('B').followedBy('C').times(0, 2).withIn(Time.milliseconds(3)) ``` ########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java: ########## @@ -158,9 +158,10 @@ public static boolean canProduceEmptyMatches(final Pattern<?, ?> pattern) { */ void compileFactory() { if (currentPattern.getQuantifier().getConsumingStrategy() - == Quantifier.ConsumingStrategy.NOT_FOLLOW) { + == Quantifier.ConsumingStrategy.NOT_FOLLOW Review Comment: Currently, the windowTime is the minimum of all window times and so it may happen that the window time is defined in the other Pattern. What about moving this check to the end of this method? ########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java: ########## @@ -304,6 +305,18 @@ private State<T> createMiddleStates(final State<T> sinkState) { if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) { // skip notFollow patterns, they are converted into edge conditions + if (currentPattern.getWindowTime() != null + && currentPattern.getWindowTime().toMilliseconds() > 0 + && sinkState.isFinal()) { + final State<T> notFollow = + createState(currentPattern.getName(), State.StateType.Pending); + final IterativeCondition<T> notCondition = getTakeCondition(currentPattern); + final State<T> stopState = + createStopState(notCondition, currentPattern.getName()); + notFollow.addTake(stopState, notCondition); Review Comment: ```suggestion notFollow.addProceed(stopState, notCondition); ``` ########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java: ########## @@ -304,6 +305,18 @@ private State<T> createMiddleStates(final State<T> sinkState) { if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) { // skip notFollow patterns, they are converted into edge conditions + if (currentPattern.getWindowTime() != null + && currentPattern.getWindowTime().toMilliseconds() > 0 + && sinkState.isFinal()) { Review Comment: ```suggestion && lastSink.isFinal()) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org