Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2361#discussion_r97790105
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
    @@ -130,26 +114,166 @@
                        }
     
                        // add the beginning state
    -                   final State<T> beginningState;
    +                   State<T> beginningState = 
states.get(BEGINNING_STATE_NAME);;
    +                   addTransitions(beginningState, -1, patterns, states);
    +                   return new NFAFactoryImpl<T>(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
    +           }
    +   }
     
    -                   if (states.containsKey(BEGINNING_STATE_NAME)) {
    -                           beginningState = 
states.get(BEGINNING_STATE_NAME);
    -                   } else {
    -                           beginningState = new 
State<>(BEGINNING_STATE_NAME, State.StateType.Start);
    -                           states.put(BEGINNING_STATE_NAME, 
beginningState);
    -                   }
    +   private static <T> void addTransitions(State<T> currentState, int 
patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
    +           Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
    +           State<T> succeedingState = 
states.get(succeedingPattern.getName());
     
    -                   beginningState.addStateTransition(new 
StateTransition<T>(
    +           if (shouldRepeatPattern(patternPos, patterns)) {
    +                   expandRepeatingPattern(currentState, patternPos, 
patterns, states);
    +           } else {
    +                   currentState.addStateTransition(new StateTransition<T>(
                                StateTransitionAction.TAKE,
    -                           currentState,
    -                           (FilterFunction<T>) 
currentPattern.getFilterFunction()
    +                           succeedingState,
    +                           (FilterFunction<T>) 
succeedingPattern.getFilterFunction()
                        ));
     
    -                   return new NFAFactoryImpl<T>(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
    +                   if (shouldAddSelfTransition(succeedingPattern))  {
    +                           addTransitionToSelf(succeedingPattern, 
succeedingState);
    +                   }
    +                   if (isPatternOptional(succeedingPattern)) {
    +                           addOptionalTransitions(currentState, 
patternPos, patterns, states);
    +                   }
    +           }
    +   }
    +
    +   private static <T> void addOptionalTransitions(State<T> currentState, 
int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> 
states) {
    +           int firstNonOptionalPattern = 
findFirstNonOptionalPattern(patterns, patternPos + 1);
    +
    +           for (int optionalPatternPos = patternPos + 2;
    +                           optionalPatternPos < 
Math.min(firstNonOptionalPattern + 1, patterns.size());
    +                           optionalPatternPos++) {
    +
    +                   Pattern<T, ?> optionalPattern = 
patterns.get(optionalPatternPos);
    +                   State<T> optionalState = 
states.get(optionalPattern.getName());
    +                   currentState.addStateTransition(new StateTransition<>(
    +                           StateTransitionAction.TAKE,
    +                           optionalState,
    +                           (FilterFunction<T>) 
optionalPattern.getFilterFunction()));
                }
        }
     
        /**
    +    * Expand a pattern number of times and connect expanded states. E.g. 
count(3) wil result in:
    +    *
    +    * +-----+  +-------+  +-------+
    +    * |State+->|State#1+->|State#2+
    +    * +--+--+  +-------+  +--+----+
    +    */
    +   private static <T> void expandRepeatingPattern(State<T> currentState, 
int patternPos,
    +                                                                           
                        ArrayList<Pattern<T, ?>> patterns, Map<String, 
State<T>> states) {
    +           Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
    +           State<T> succeedingState = 
states.get(succeedingPattern.getName());
    +           Pattern<T, ?> currentPattern = patterns.get(patternPos);
    +
    +           State<T> currentRepeatingState = null;
    +           State<T> nextRepeatingState = currentState;
    +           for (int i = 1; i < currentPattern.getMaxCount(); i++) {
    +                   currentRepeatingState = nextRepeatingState;
    +                   nextRepeatingState = new State<>(
    +                           currentState.getName() + "#" + i,
    +                           State.StateType.Normal);
    +                   states.put(nextRepeatingState.getName(), 
nextRepeatingState);
    +                   currentRepeatingState.addStateTransition(new 
StateTransition<T>(
    +                           StateTransitionAction.TAKE,
    +                           nextRepeatingState,
    +                           (FilterFunction<T>) 
currentPattern.getFilterFunction()));
    +
    +                   // Add a transition around optional pattern.
    +                   // count(2,3) will result in:
    +                   // +-----+  +-------+   +-------+  +----+
    +                   // |State+->|State#1+-->|State#2+->|Next|
    +                   // +--+--+  +-------+   +--+----+  +-+--+
    +                   //              |                    ^
    +                   //              +--------------------+
    +                   if (i >= currentPattern.getMinCount()) {
    +                           currentRepeatingState.addStateTransition(new 
StateTransition<T>(
    +                                   StateTransitionAction.TAKE,
    +                                   succeedingState,
    +                                   (FilterFunction<T>) 
succeedingPattern.getFilterFunction()));
    +                   }
    +           }
    +           nextRepeatingState.addStateTransition(new StateTransition<T>(
    +                   StateTransitionAction.TAKE,
    +                   succeedingState,
    +                   (FilterFunction<T>) 
succeedingPattern.getFilterFunction()));
    +   }
    +
    +   private static <T> boolean shouldRepeatPattern(int patternPos, 
ArrayList<Pattern<T, ?>> patterns) {
    +           if (patternPos == -1) {
    +                   return false;
    +           }
    +
    +           Pattern<T, ?> pattern = patterns.get(patternPos);
    +           return pattern.getMinCount() != 1 || pattern.getMaxCount() != 1;
    +   }
    +
    +   private static <T> void addTransitionToSelf(Pattern<T, ?> 
succeedingPattern, State<T> succeedingState) {
    +           succeedingState.addStateTransition(new StateTransition<T>(
    +                   StateTransitionAction.TAKE,
    +                   succeedingState,
    +                   (FilterFunction<T>) 
succeedingPattern.getFilterFunction()));
    +   }
    +
    +   private static <T> boolean shouldAddSelfTransition(Pattern<T, ?> 
succeedingPattern) {
    +           return succeedingPattern.getQuantifier() == 
Quantifier.ZERO_OR_MANY
    +                   || succeedingPattern.getQuantifier() == 
Quantifier.ONE_OR_MANY;
    +   }
    +
    +   private static <T> int findFirstNonOptionalPattern(ArrayList<Pattern<T, 
?>> patterns, int startPos) {
    --- End diff --
    
    The `patterns` can become a `List` instead of `ArrayList`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to