[ 
https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837832#comment-15837832
 ] 

ASF GitHub Bot commented on FLINK-3318:
---------------------------------------

Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2361#discussion_r97790271
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
    @@ -130,26 +114,166 @@
                        }
     
                        // add the beginning state
    -                   final State<T> beginningState;
    +                   State<T> beginningState = 
states.get(BEGINNING_STATE_NAME);;
    +                   addTransitions(beginningState, -1, patterns, states);
    +                   return new NFAFactoryImpl<T>(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
    +           }
    +   }
     
    -                   if (states.containsKey(BEGINNING_STATE_NAME)) {
    -                           beginningState = 
states.get(BEGINNING_STATE_NAME);
    -                   } else {
    -                           beginningState = new 
State<>(BEGINNING_STATE_NAME, State.StateType.Start);
    -                           states.put(BEGINNING_STATE_NAME, 
beginningState);
    -                   }
    +   private static <T> void addTransitions(State<T> currentState, int 
patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
    +           Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
    +           State<T> succeedingState = 
states.get(succeedingPattern.getName());
     
    -                   beginningState.addStateTransition(new 
StateTransition<T>(
    +           if (shouldRepeatPattern(patternPos, patterns)) {
    +                   expandRepeatingPattern(currentState, patternPos, 
patterns, states);
    +           } else {
    +                   currentState.addStateTransition(new StateTransition<T>(
                                StateTransitionAction.TAKE,
    -                           currentState,
    -                           (FilterFunction<T>) 
currentPattern.getFilterFunction()
    +                           succeedingState,
    +                           (FilterFunction<T>) 
succeedingPattern.getFilterFunction()
                        ));
     
    -                   return new NFAFactoryImpl<T>(inputTypeSerializer, 
windowTime, new HashSet<>(states.values()), timeoutHandling);
    +                   if (shouldAddSelfTransition(succeedingPattern))  {
    +                           addTransitionToSelf(succeedingPattern, 
succeedingState);
    +                   }
    +                   if (isPatternOptional(succeedingPattern)) {
    +                           addOptionalTransitions(currentState, 
patternPos, patterns, states);
    +                   }
    +           }
    +   }
    +
    +   private static <T> void addOptionalTransitions(State<T> currentState, 
int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> 
states) {
    +           int firstNonOptionalPattern = 
findFirstNonOptionalPattern(patterns, patternPos + 1);
    +
    +           for (int optionalPatternPos = patternPos + 2;
    +                           optionalPatternPos < 
Math.min(firstNonOptionalPattern + 1, patterns.size());
    +                           optionalPatternPos++) {
    +
    +                   Pattern<T, ?> optionalPattern = 
patterns.get(optionalPatternPos);
    +                   State<T> optionalState = 
states.get(optionalPattern.getName());
    +                   currentState.addStateTransition(new StateTransition<>(
    +                           StateTransitionAction.TAKE,
    +                           optionalState,
    +                           (FilterFunction<T>) 
optionalPattern.getFilterFunction()));
                }
        }
     
        /**
    +    * Expand a pattern number of times and connect expanded states. E.g. 
count(3) wil result in:
    +    *
    +    * +-----+  +-------+  +-------+
    +    * |State+->|State#1+->|State#2+
    +    * +--+--+  +-------+  +--+----+
    +    */
    +   private static <T> void expandRepeatingPattern(State<T> currentState, 
int patternPos,
    +                                                                           
                        ArrayList<Pattern<T, ?>> patterns, Map<String, 
State<T>> states) {
    +           Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
    +           State<T> succeedingState = 
states.get(succeedingPattern.getName());
    +           Pattern<T, ?> currentPattern = patterns.get(patternPos);
    +
    +           State<T> currentRepeatingState = null;
    +           State<T> nextRepeatingState = currentState;
    +           for (int i = 1; i < currentPattern.getMaxCount(); i++) {
    +                   currentRepeatingState = nextRepeatingState;
    +                   nextRepeatingState = new State<>(
    +                           currentState.getName() + "#" + i,
    +                           State.StateType.Normal);
    +                   states.put(nextRepeatingState.getName(), 
nextRepeatingState);
    +                   currentRepeatingState.addStateTransition(new 
StateTransition<T>(
    +                           StateTransitionAction.TAKE,
    +                           nextRepeatingState,
    +                           (FilterFunction<T>) 
currentPattern.getFilterFunction()));
    +
    +                   // Add a transition around optional pattern.
    +                   // count(2,3) will result in:
    +                   // +-----+  +-------+   +-------+  +----+
    +                   // |State+->|State#1+-->|State#2+->|Next|
    +                   // +--+--+  +-------+   +--+----+  +-+--+
    +                   //              |                    ^
    +                   //              +--------------------+
    +                   if (i >= currentPattern.getMinCount()) {
    +                           currentRepeatingState.addStateTransition(new 
StateTransition<T>(
    +                                   StateTransitionAction.TAKE,
    +                                   succeedingState,
    +                                   (FilterFunction<T>) 
succeedingPattern.getFilterFunction()));
    +                   }
    +           }
    +           nextRepeatingState.addStateTransition(new StateTransition<T>(
    +                   StateTransitionAction.TAKE,
    +                   succeedingState,
    +                   (FilterFunction<T>) 
succeedingPattern.getFilterFunction()));
    +   }
    +
    +   private static <T> boolean shouldRepeatPattern(int patternPos, 
ArrayList<Pattern<T, ?>> patterns) {
    +           if (patternPos == -1) {
    +                   return false;
    +           }
    +
    +           Pattern<T, ?> pattern = patterns.get(patternPos);
    +           return pattern.getMinCount() != 1 || pattern.getMaxCount() != 1;
    +   }
    +
    +   private static <T> void addTransitionToSelf(Pattern<T, ?> 
succeedingPattern, State<T> succeedingState) {
    +           succeedingState.addStateTransition(new StateTransition<T>(
    +                   StateTransitionAction.TAKE,
    +                   succeedingState,
    +                   (FilterFunction<T>) 
succeedingPattern.getFilterFunction()));
    +   }
    +
    +   private static <T> boolean shouldAddSelfTransition(Pattern<T, ?> 
succeedingPattern) {
    +           return succeedingPattern.getQuantifier() == 
Quantifier.ZERO_OR_MANY
    +                   || succeedingPattern.getQuantifier() == 
Quantifier.ONE_OR_MANY;
    +   }
    +
    +   private static <T> int findFirstNonOptionalPattern(ArrayList<Pattern<T, 
?>> patterns, int startPos) {
    +           int pos = startPos;
    +           for (; pos < patterns.size(); pos++) {
    +                   Pattern<T, ?> pattern = patterns.get(pos);
    +                   if (!isPatternOptional(pattern)) {
    +                           return pos;
    +                   }
    +           }
    +
    +           return pos;
    +   }
    +
    +   private static <T> Map<String, State<T>> 
createStatesFrom(ArrayList<Pattern<T, ?>> patterns) {
    --- End diff --
    
    The `patterns` can become a `List` instead of `ArrayList`.


> Add support for quantifiers to CEP's pattern API
> ------------------------------------------------
>
>                 Key: FLINK-3318
>                 URL: https://issues.apache.org/jira/browse/FLINK-3318
>             Project: Flink
>          Issue Type: Improvement
>          Components: CEP
>    Affects Versions: 1.0.0
>            Reporter: Till Rohrmann
>            Assignee: Ivan Mushketyk
>            Priority: Minor
>
> It would be a good addition to extend the pattern API to support quantifiers 
> known from regular expressions (e.g. Kleene star, ?, +, or count bounds). 
> This would considerably enrich the set of supported patterns.
> Implementing the count bounds could be done by unrolling the pattern state. 
> In order to support the Kleene star operator, the {{NFACompiler}} has to be 
> extended to insert epsilon-transition between a Kleene start state and the 
> succeeding pattern state. In order to support {{?}}, one could insert two 
> paths from the preceding state, one which accepts the event and another which 
> directly goes into the next pattern state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to