[ https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837828#comment-15837828 ]
ASF GitHub Bot commented on FLINK-3318: --------------------------------------- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97790105 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -130,26 +114,166 @@ } // add the beginning state - final State<T> beginningState; + State<T> beginningState = states.get(BEGINNING_STATE_NAME);; + addTransitions(beginningState, -1, patterns, states); + return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + } + } - if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - } else { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - } + private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) { + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1); + State<T> succeedingState = states.get(succeedingPattern.getName()); - beginningState.addStateTransition(new StateTransition<T>( + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + } else { + currentState.addStateTransition(new StateTransition<T>( StateTransitionAction.TAKE, - currentState, - (FilterFunction<T>) currentPattern.getFilterFunction() + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction() )); - return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + } + if (isPatternOptional(succeedingPattern)) { + addOptionalTransitions(currentState, patternPos, patterns, states); + } + } + } + + private static <T> void addOptionalTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) { + int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1); + + for (int optionalPatternPos = patternPos + 2; + optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size()); + optionalPatternPos++) { + + Pattern<T, ?> optionalPattern = patterns.get(optionalPatternPos); + State<T> optionalState = states.get(optionalPattern.getName()); + currentState.addStateTransition(new StateTransition<>( + StateTransitionAction.TAKE, + optionalState, + (FilterFunction<T>) optionalPattern.getFilterFunction())); } } /** + * Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in: + * + * +-----+ +-------+ +-------+ + * |State+->|State#1+->|State#2+ + * +--+--+ +-------+ +--+----+ + */ + private static <T> void expandRepeatingPattern(State<T> currentState, int patternPos, + ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) { + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1); + State<T> succeedingState = states.get(succeedingPattern.getName()); + Pattern<T, ?> currentPattern = patterns.get(patternPos); + + State<T> currentRepeatingState = null; + State<T> nextRepeatingState = currentState; + for (int i = 1; i < currentPattern.getMaxCount(); i++) { + currentRepeatingState = nextRepeatingState; + nextRepeatingState = new State<>( + currentState.getName() + "#" + i, + State.StateType.Normal); + states.put(nextRepeatingState.getName(), nextRepeatingState); + currentRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + nextRepeatingState, + (FilterFunction<T>) currentPattern.getFilterFunction())); + + // Add a transition around optional pattern. + // count(2,3) will result in: + // +-----+ +-------+ +-------+ +----+ + // |State+->|State#1+-->|State#2+->|Next| + // +--+--+ +-------+ +--+----+ +-+--+ + // | ^ + // +--------------------+ + if (i >= currentPattern.getMinCount()) { + currentRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + } + } + nextRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + } + + private static <T> boolean shouldRepeatPattern(int patternPos, ArrayList<Pattern<T, ?>> patterns) { + if (patternPos == -1) { + return false; + } + + Pattern<T, ?> pattern = patterns.get(patternPos); + return pattern.getMinCount() != 1 || pattern.getMaxCount() != 1; + } + + private static <T> void addTransitionToSelf(Pattern<T, ?> succeedingPattern, State<T> succeedingState) { + succeedingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + } + + private static <T> boolean shouldAddSelfTransition(Pattern<T, ?> succeedingPattern) { + return succeedingPattern.getQuantifier() == Quantifier.ZERO_OR_MANY + || succeedingPattern.getQuantifier() == Quantifier.ONE_OR_MANY; + } + + private static <T> int findFirstNonOptionalPattern(ArrayList<Pattern<T, ?>> patterns, int startPos) { --- End diff -- The `patterns` can become a `List` instead of `ArrayList`. > Add support for quantifiers to CEP's pattern API > ------------------------------------------------ > > Key: FLINK-3318 > URL: https://issues.apache.org/jira/browse/FLINK-3318 > Project: Flink > Issue Type: Improvement > Components: CEP > Affects Versions: 1.0.0 > Reporter: Till Rohrmann > Assignee: Ivan Mushketyk > Priority: Minor > > It would be a good addition to extend the pattern API to support quantifiers > known from regular expressions (e.g. Kleene star, ?, +, or count bounds). > This would considerably enrich the set of supported patterns. > Implementing the count bounds could be done by unrolling the pattern state. > In order to support the Kleene star operator, the {{NFACompiler}} has to be > extended to insert epsilon-transition between a Kleene start state and the > succeeding pattern state. In order to support {{?}}, one could insert two > paths from the preceding state, one which accepts the event and another which > directly goes into the next pattern state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)