Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97790105 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -130,26 +114,166 @@ } // add the beginning state - final State<T> beginningState; + State<T> beginningState = states.get(BEGINNING_STATE_NAME);; + addTransitions(beginningState, -1, patterns, states); + return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + } + } - if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - } else { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - } + private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) { + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1); + State<T> succeedingState = states.get(succeedingPattern.getName()); - beginningState.addStateTransition(new StateTransition<T>( + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + } else { + currentState.addStateTransition(new StateTransition<T>( StateTransitionAction.TAKE, - currentState, - (FilterFunction<T>) currentPattern.getFilterFunction() + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction() )); - return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + } + if (isPatternOptional(succeedingPattern)) { + addOptionalTransitions(currentState, patternPos, patterns, states); + } + } + } + + private static <T> void addOptionalTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) { + int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1); + + for (int optionalPatternPos = patternPos + 2; + optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size()); + optionalPatternPos++) { + + Pattern<T, ?> optionalPattern = patterns.get(optionalPatternPos); + State<T> optionalState = states.get(optionalPattern.getName()); + currentState.addStateTransition(new StateTransition<>( + StateTransitionAction.TAKE, + optionalState, + (FilterFunction<T>) optionalPattern.getFilterFunction())); } } /** + * Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in: + * + * +-----+ +-------+ +-------+ + * |State+->|State#1+->|State#2+ + * +--+--+ +-------+ +--+----+ + */ + private static <T> void expandRepeatingPattern(State<T> currentState, int patternPos, + ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) { + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1); + State<T> succeedingState = states.get(succeedingPattern.getName()); + Pattern<T, ?> currentPattern = patterns.get(patternPos); + + State<T> currentRepeatingState = null; + State<T> nextRepeatingState = currentState; + for (int i = 1; i < currentPattern.getMaxCount(); i++) { + currentRepeatingState = nextRepeatingState; + nextRepeatingState = new State<>( + currentState.getName() + "#" + i, + State.StateType.Normal); + states.put(nextRepeatingState.getName(), nextRepeatingState); + currentRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + nextRepeatingState, + (FilterFunction<T>) currentPattern.getFilterFunction())); + + // Add a transition around optional pattern. + // count(2,3) will result in: + // +-----+ +-------+ +-------+ +----+ + // |State+->|State#1+-->|State#2+->|Next| + // +--+--+ +-------+ +--+----+ +-+--+ + // | ^ + // +--------------------+ + if (i >= currentPattern.getMinCount()) { + currentRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + } + } + nextRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + } + + private static <T> boolean shouldRepeatPattern(int patternPos, ArrayList<Pattern<T, ?>> patterns) { + if (patternPos == -1) { + return false; + } + + Pattern<T, ?> pattern = patterns.get(patternPos); + return pattern.getMinCount() != 1 || pattern.getMaxCount() != 1; + } + + private static <T> void addTransitionToSelf(Pattern<T, ?> succeedingPattern, State<T> succeedingState) { + succeedingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + } + + private static <T> boolean shouldAddSelfTransition(Pattern<T, ?> succeedingPattern) { + return succeedingPattern.getQuantifier() == Quantifier.ZERO_OR_MANY + || succeedingPattern.getQuantifier() == Quantifier.ONE_OR_MANY; + } + + private static <T> int findFirstNonOptionalPattern(ArrayList<Pattern<T, ?>> patterns, int startPos) { --- End diff -- The `patterns` can become a `List` instead of `ArrayList`.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---