dianfu commented on code in PR #19295:
URL: https://github.com/apache/flink/pull/19295#discussion_r895598233


##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java:
##########
@@ -261,15 +261,31 @@ public Collection<Tuple2<Map<String, List<T>>, Long>> 
advanceTime(
             final NFAState nfaState,
             final long timestamp)
             throws Exception {
+        return advanceTimeAndHandlePendingState(sharedBufferAccessor, 
nfaState, timestamp).f1;
+    }
+
+    public Tuple2<Collection<Map<String, List<T>>>, 
Collection<Tuple2<Map<String, List<T>>, Long>>>
+            advanceTimeAndHandlePendingState(

Review Comment:
   There is no need to introduce advanceTimeAndHandlePendingState. We could 
just update the signature of advanceTime if needed.



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##########
@@ -304,6 +305,18 @@ private State<T> createMiddleStates(final State<T> 
sinkState) {
                 if (currentPattern.getQuantifier().getConsumingStrategy()
                         == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
                     // skip notFollow patterns, they are converted into edge 
conditions
+                    if (currentPattern.getWindowTime() != null

Review Comment:
   should use `windowTime` instead of `currentPattern.getWindowTime()`. This 
also means that we need to calculate `windowTime` in advance. 



##########
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java:
##########
@@ -630,6 +630,102 @@ public String select(Map<String, List<Event>> pattern) {
         assertEquals(expected, resultList);
     }
 
+    @Test
+    public void testNotFollowedByWithIn() throws Exception {

Review Comment:
   Could we support the following pattern? If so, it would be great to add an 
ITCase.
   ```
   Pattern.begin('A').notFollowedBy('B').followedBy('C').times(0, 
2).withIn(Time.milliseconds(3))
   ```



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##########
@@ -158,9 +158,10 @@ public static boolean canProduceEmptyMatches(final 
Pattern<?, ?> pattern) {
          */
         void compileFactory() {
             if (currentPattern.getQuantifier().getConsumingStrategy()
-                    == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
+                            == Quantifier.ConsumingStrategy.NOT_FOLLOW

Review Comment:
   Currently, the windowTime is the minimum of all window times and so it may 
happen that the window time is defined in the other Pattern. What about moving 
this check to the end of this method?



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##########
@@ -304,6 +305,18 @@ private State<T> createMiddleStates(final State<T> 
sinkState) {
                 if (currentPattern.getQuantifier().getConsumingStrategy()
                         == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
                     // skip notFollow patterns, they are converted into edge 
conditions
+                    if (currentPattern.getWindowTime() != null
+                            && currentPattern.getWindowTime().toMilliseconds() 
> 0
+                            && sinkState.isFinal()) {
+                        final State<T> notFollow =
+                                createState(currentPattern.getName(), 
State.StateType.Pending);
+                        final IterativeCondition<T> notCondition = 
getTakeCondition(currentPattern);
+                        final State<T> stopState =
+                                createStopState(notCondition, 
currentPattern.getName());
+                        notFollow.addTake(stopState, notCondition);

Review Comment:
   ```suggestion
                           notFollow.addProceed(stopState, notCondition);
   ```



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##########
@@ -304,6 +305,18 @@ private State<T> createMiddleStates(final State<T> 
sinkState) {
                 if (currentPattern.getQuantifier().getConsumingStrategy()
                         == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
                     // skip notFollow patterns, they are converted into edge 
conditions
+                    if (currentPattern.getWindowTime() != null
+                            && currentPattern.getWindowTime().toMilliseconds() 
> 0
+                            && sinkState.isFinal()) {

Review Comment:
   ```suggestion
                               && lastSink.isFinal()) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to