[FLINK-3318] Add support for quantifiers to CEP's pattern API
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9001c4ef Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9001c4ef Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9001c4ef Branch: refs/heads/table-retraction Commit: 9001c4ef82a7a04d821252ac62bb7809a931c98a Parents: d0695c0 Author: Dawid Wysakowicz <da...@getindata.com> Authored: Sat Mar 18 20:53:00 2017 +0100 Committer: kl0u <kklou...@gmail.com> Committed: Thu Mar 23 10:47:55 2017 +0100 ---------------------------------------------------------------------- docs/dev/libs/cep.md | 78 +- .../flink/cep/scala/pattern/Pattern.scala | 81 +- .../apache/flink/cep/nfa/ComputationState.java | 44 +- .../org/apache/flink/cep/nfa/DeweyNumber.java | 17 +- .../main/java/org/apache/flink/cep/nfa/NFA.java | 398 ++++-- .../org/apache/flink/cep/nfa/SharedBuffer.java | 25 +- .../java/org/apache/flink/cep/nfa/State.java | 32 +- .../apache/flink/cep/nfa/StateTransition.java | 20 +- .../flink/cep/nfa/StateTransitionAction.java | 4 +- .../nfa/compiler/MalformedPatternException.java | 32 - .../flink/cep/nfa/compiler/NFACompiler.java | 317 ++++- .../flink/cep/pattern/FilterFunctions.java | 44 + .../cep/pattern/MalformedPatternException.java | 32 + .../flink/cep/pattern/NotFilterFunction.java | 42 + .../org/apache/flink/cep/pattern/Pattern.java | 115 ++ .../apache/flink/cep/pattern/Quantifier.java | 54 + .../org/apache/flink/cep/nfa/NFAITCase.java | 1338 +++++++++++++++++- .../java/org/apache/flink/cep/nfa/NFATest.java | 90 +- .../flink/cep/nfa/compiler/NFACompilerTest.java | 152 +- .../apache/flink/cep/pattern/PatternTest.java | 54 + 20 files changed, 2595 insertions(+), 374 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/docs/dev/libs/cep.md ---------------------------------------------------------------------- diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md index 8047481..22cffbc 100644 --- a/docs/dev/libs/cep.md +++ b/docs/dev/libs/cep.md @@ -341,7 +341,45 @@ patternState.subtype(SubEvent.class); patternState.within(Time.seconds(10)); {% endhighlight %} </td> - </tr> + </tr> + <tr> + <td><strong>ZeroOrMore</strong></td> + <td> + <p>Specifies that this pattern can occur zero or more times(kleene star). This means any number of events can be matched in this state.</p> + <p>If eagerness is enabled(by default) for a pattern A*B and sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.</p> + {% highlight java %} + patternState.zeroOrMore(); + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>OneOrMore</strong></td> + <td> + <p>Specifies that this pattern can occur one or more times(kleene star). This means at least one and at most infinite number of events can be matched in this state.</p> + <p>If eagerness is enabled (by default) for a pattern A*B and sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.</p> + {% highlight java %} + patternState.oneOrMore(); + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Optional</strong></td> + <td> + <p>Specifies that this pattern can occur zero or once.</p> + {% highlight java %} + patternState.optional(); + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Times</strong></td> + <td> + <p>Specifies exact number of times that this pattern should be matched.</p> + {% highlight java %} + patternState.times(2); + {% endhighlight %} + </td> + </tr> </tbody> </table> </div> @@ -419,6 +457,44 @@ patternState.within(Time.seconds(10)) {% endhighlight %} </td> </tr> + <tr> + <td><strong>ZeroOrMore</strong></td> + <td> + <p>Specifies that this pattern can occur zero or more times(kleene star). This means any number of events can be matched in this state.</p> + <p>If eagerness is enabled(by default) for a pattern A*B and sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.</p> + {% highlight scala %} + patternState.zeroOrMore() + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>OneOrMore</strong></td> + <td> + <p>Specifies that this pattern can occur one or more times(kleene star). This means at least one and at most infinite number of events can be matched in this state.</p> + <p>If eagerness is enabled (by default) for a pattern A*B and sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.</p> + {% highlight scala %} + patternState.oneOrMore() + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Optional</strong></td> + <td> + <p>Specifies that this pattern can occur zero or once.</p> + {% highlight scala %} + patternState.optional() + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Times</strong></td> + <td> + <p>Specifies exact number of times that this pattern should be matched.</p> + {% highlight scala %} + patternState.times(2) + {% endhighlight %} + </td> + </tr> </tbody> </table> </div> http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala index cc3b03c..5baf780 100644 --- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala @@ -19,7 +19,7 @@ package org.apache.flink.cep.scala.pattern import org.apache.flink.api.common.functions.FilterFunction import org.apache.flink.cep -import org.apache.flink.cep.pattern.{Pattern => JPattern} +import org.apache.flink.cep.pattern.{Quantifier, Pattern => JPattern} import org.apache.flink.streaming.api.windowing.time.Time /** @@ -59,6 +59,12 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) { /** * + * @return currently applied quantifier to this pattern + */ + def getQuantifier: Quantifier = jPattern.getQuantifier + + /** + * * @return Filter condition for an event to be matched */ def getFilterFunction(): Option[FilterFunction[F]] = { @@ -160,6 +166,79 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) { wrapPattern(jPattern.getPrevious()) } + /** + * Specifies that this pattern can occur zero or more times(kleene star). + * This means any number of events can be matched in this state. + * + * @return The same pattern with applied Kleene star operator + */ + def zeroOrMore: Pattern[T, F] = { + jPattern.zeroOrMore() + this + } + + /** + * Specifies that this pattern can occur zero or more times(kleene star). + * This means any number of events can be matched in this state. + * + * If eagerness is enabled for a pattern A*B and sequence A1 A2 B will generate patterns: + * B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B. + * + * @param eager if true the pattern always consumes earlier events + * @return The same pattern with applied Kleene star operator + */ + def zeroOrMore(eager: Boolean): Pattern[T, F] = { + jPattern.zeroOrMore(eager) + this + } + + /** + * Specifies that this pattern can occur one or more times(kleene star). + * This means at least one and at most infinite number of events can be matched in this state. + * + * @return The same pattern with applied Kleene plus operator + */ + def oneOrMore: Pattern[T, F] = { + jPattern.oneOrMore() + this + } + + /** + * Specifies that this pattern can occur one or more times(kleene star). + * This means at least one and at most infinite number of events can be matched in this state. + * + * If eagerness is enabled for a pattern A+B and sequence A1 A2 B will generate patterns: + * A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B. + * + * @param eager if true the pattern always consumes earlier events + * @return The same pattern with applied Kleene plus operator + */ + def oneOrMore(eager: Boolean): Pattern[T, F] = { + jPattern.oneOrMore(eager) + this + } + + /** + * Specifies that this pattern can occur zero or once. + * + * @return The same pattern with applied Kleene ? operator + */ + def optional: Pattern[T, F] = { + jPattern.optional() + this + } + + /** + * Specifies exact number of times that this pattern should be matched. + * + * @param times number of times matching event must appear + * @return The same pattern with number of times applied + */ + def times(times: Int): Pattern[T, F] = { + jPattern.times(times) + this + } + } object Pattern { http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java index 3f44fba..445d038 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java @@ -18,6 +18,8 @@ package org.apache.flink.cep.nfa; +import org.apache.flink.util.Preconditions; + /** * Helper class which encapsulates the state of the NFA computation. It points to the current state, * the last taken event, its occurrence timestamp, the current version and the starting timestamp @@ -41,17 +43,21 @@ public class ComputationState<T> { // Timestamp of the first element in the pattern private final long startTimestamp; - public ComputationState( - final State<T> currentState, - final T event, - final long timestamp, - final DeweyNumber version, - final long startTimestamp) { + private final State<T> previousState; + + private ComputationState( + final State<T> currentState, + final State<T> previousState, + final T event, + final long timestamp, + final DeweyNumber version, + final long startTimestamp) { this.state = currentState; this.event = event; this.timestamp = timestamp; this.version = version; this.startTimestamp = startTimestamp; + this.previousState = previousState; } public boolean isFinalState() { @@ -59,7 +65,7 @@ public class ComputationState<T> { } public boolean isStartState() { - return state.isStart(); + return state.isStart() && event == null; } public long getTimestamp() { @@ -74,6 +80,10 @@ public class ComputationState<T> { return state; } + public State<T> getPreviousState() { + return previousState; + } + public T getEvent() { return event; } @@ -81,4 +91,24 @@ public class ComputationState<T> { public DeweyNumber getVersion() { return version; } + + public static <T> ComputationState<T> createStartState(final State<T> state) { + Preconditions.checkArgument(state.isStart()); + return new ComputationState<>(state, null, null, -1L, new DeweyNumber(1), -1L); + } + + public static <T> ComputationState<T> createStartState(final State<T> state, final DeweyNumber version) { + Preconditions.checkArgument(state.isStart()); + return new ComputationState<>(state, null, null, -1L, version, -1L); + } + + public static <T> ComputationState<T> createState( + final State<T> currentState, + final State<T> previousState, + final T event, + final long timestamp, + final DeweyNumber version, + final long startTimestamp) { + return new ComputationState<>(currentState, previousState, event, timestamp, version, startTimestamp); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java index bb9039d..fd3fafa 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java @@ -44,6 +44,10 @@ public class DeweyNumber implements Serializable { this.deweyNumber = deweyNumber; } + public DeweyNumber(DeweyNumber number) { + this.deweyNumber = Arrays.copyOf(number.deweyNumber, number.deweyNumber.length); + } + /** * Checks whether this dewey number is compatible to the other dewey number. * @@ -90,8 +94,19 @@ public class DeweyNumber implements Serializable { * @return A new dewey number derived from this whose last digit is increased by one */ public DeweyNumber increase() { + return increase(1); + } + + /** + * Creates a new dewey number from this such that its last digit is increased by the supplied + * number + * + * @param times how many times to increase the Dewey number + * @return A new dewey number derived from this whose last digit is increased by given number + */ + public DeweyNumber increase(int times) { int[] newDeweyNumber = Arrays.copyOf(deweyNumber, deweyNumber.length); - newDeweyNumber[deweyNumber.length - 1]++; + newDeweyNumber[deweyNumber.length - 1] += times; return new DeweyNumber(newDeweyNumber); } http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index 257418a..3d42248 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -19,6 +19,7 @@ package org.apache.flink.cep.nfa; import com.google.common.collect.LinkedHashMultimap; +import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; @@ -39,7 +40,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -87,7 +87,7 @@ public class NFA<T> implements Serializable { /** * Buffer used to store the matched events. */ - private final SharedBuffer<State<T>, T> sharedBuffer; + private final SharedBuffer<String, T> sharedBuffer; /** * A set of all the valid NFA states, as returned by the @@ -98,7 +98,7 @@ public class NFA<T> implements Serializable { /** * The length of a windowed pattern, as specified using the - * {@link org.apache.flink.cep.pattern.Pattern#within(Time) Pattern.within(Time)} + * {@link org.apache.flink.cep.pattern.Pattern#within(Time)} Pattern.within(Time)} * method. */ private final long windowTime; @@ -109,9 +109,6 @@ public class NFA<T> implements Serializable { */ private final boolean handleTimeout; - // Current starting index for the next dewey version number - private int startEventCounter; - /** * Current set of {@link ComputationState computation states} within the state machine. * These are the "active" intermediate states that are waiting for new matching @@ -119,8 +116,6 @@ public class NFA<T> implements Serializable { */ private transient Queue<ComputationState<T>> computationStates; - private StateTransitionComparator<T> stateTransitionComparator; - public NFA( final TypeSerializer<T> eventSerializer, final long windowTime, @@ -129,11 +124,10 @@ public class NFA<T> implements Serializable { this.nonDuplicatingTypeSerializer = new NonDuplicatingTypeSerializer<>(eventSerializer); this.windowTime = windowTime; this.handleTimeout = handleTimeout; - this.sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer); - this.computationStates = new LinkedList<>(); - this.states = new HashSet<>(); - this.startEventCounter = 1; - this.stateTransitionComparator = new StateTransitionComparator<>(); + sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer); + computationStates = new LinkedList<>(); + + states = new HashSet<>(); } public Set<State<T>> getStates() { @@ -150,7 +144,7 @@ public class NFA<T> implements Serializable { states.add(state); if (state.isStart()) { - computationStates.add(new ComputationState<>(state, null, -1L, null, -1L)); + computationStates.add(ComputationState.createStartState(state)); } } @@ -201,8 +195,8 @@ public class NFA<T> implements Serializable { } // remove computation state which has exceeded the window length - sharedBuffer.release(computationState.getState(), computationState.getEvent(), computationState.getTimestamp()); - sharedBuffer.remove(computationState.getState(), computationState.getEvent(), computationState.getTimestamp()); + sharedBuffer.release(computationState.getState().getName(), computationState.getEvent(), computationState.getTimestamp()); + sharedBuffer.remove(computationState.getState().getName(), computationState.getEvent(), computationState.getTimestamp()); newComputationStates = Collections.emptyList(); } else if (event != null) { @@ -218,8 +212,8 @@ public class NFA<T> implements Serializable { result.addAll(matches); // remove found patterns because they are no longer needed - sharedBuffer.release(newComputationState.getState(), newComputationState.getEvent(), newComputationState.getTimestamp()); - sharedBuffer.remove(newComputationState.getState(), newComputationState.getEvent(), newComputationState.getTimestamp()); + sharedBuffer.release(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp()); + sharedBuffer.remove(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp()); } else { // add new computation state; it will be processed once the next event arrives computationStates.add(newComputationState); @@ -252,8 +246,7 @@ public class NFA<T> implements Serializable { return nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) && sharedBuffer.equals(other.sharedBuffer) && states.equals(other.states) && - windowTime == other.windowTime && - startEventCounter == other.startEventCounter; + windowTime == other.windowTime; } else { return false; } @@ -261,12 +254,80 @@ public class NFA<T> implements Serializable { @Override public int hashCode() { - return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, states, windowTime, startEventCounter); + return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, states, windowTime); + } + + private static <T> boolean isEquivalentState(final State<T> s1, final State<T> s2) { + return s1.getName().equals(s2.getName()); } /** + * Class for storing resolved transitions. It counts at insert time the number of + * branching transitions both for IGNORE and TAKE actions. + */ + private static class OutgoingEdges<T> { + private List<StateTransition<T>> edges = new ArrayList<>(); + + private final State<T> currentState; + + private int totalTakeBranches = 0; + private int totalIgnoreBranches = 0; + + OutgoingEdges(final State<T> currentState) { + this.currentState = currentState; + } + + void add(StateTransition<T> edge) { + + if (!isSelfIgnore(edge)) { + if (edge.getAction() == StateTransitionAction.IGNORE) { + totalIgnoreBranches++; + } else if (edge.getAction() == StateTransitionAction.TAKE) { + totalTakeBranches++; + } + } + + edges.add(edge); + } + + int getTotalIgnoreBranches() { + return totalIgnoreBranches; + } + int getTotalTakeBranches() { + return totalTakeBranches; + } + + List<StateTransition<T>> getEdges() { + return edges; + } + + private boolean isSelfIgnore(final StateTransition<T> edge) { + return isEquivalentState(edge.getTargetState(), currentState) && + edge.getAction() == StateTransitionAction.IGNORE; + } + } + + + /** * Computes the next computation states based on the given computation state, the current event, - * its timestamp and the internal state machine. + * its timestamp and the internal state machine. The algorithm is: + * + * 1. Decide on valid transitions and number of branching paths. See {@link OutgoingEdges} + * 2. Perform transitions: + * a) IGNORE (links in {@link SharedBuffer} will still point to the previous event) + * - do not perform for Start State - special case + * - if stays in the same state increase the current stage for future use with number of + * outgoing edges + * - if after PROCEED increase current stage and add new stage (as we change the state) + * - lock the entry in {@link SharedBuffer} as it is needed in the created branch + * b) TAKE (links in {@link SharedBuffer} will point to the current event) + * - add entry to the shared buffer with version of the current computation state + * - add stage and then increase with number of takes for the future computation states + * - peek to the next state if it has PROCEED path to a Final State, if true create + * Final ComputationState to emit results + * 3. Handle the Start State, as it always have to remain + * 4. Release the corresponding entries in {@link SharedBuffer}. + * * * @param computationState Current computation state * @param event Current event which is processed @@ -277,31 +338,179 @@ public class NFA<T> implements Serializable { final ComputationState<T> computationState, final T event, final long timestamp) { - Stack<State<T>> states = new Stack<>(); - List<ComputationState<T>> resultingComputationStates = new ArrayList<>(); - State<T> state = computationState.getState(); - states.push(state); + final OutgoingEdges<T> outgoingEdges = createDecisionGraph(computationState, event); + + // Create the computing version based on the previously computed edges + // We need to defer the creation of computation states until we know how many edges start + // at this computation state so that we can assign proper version + final List<StateTransition<T>> edges = outgoingEdges.getEdges(); + int takeBranchesToVisit = Math.max(0, outgoingEdges.getTotalTakeBranches() - 1); + int ignoreBranchesToVisit = outgoingEdges.getTotalIgnoreBranches(); + + final List<ComputationState<T>> resultingComputationStates = new ArrayList<>(); + for (StateTransition<T> edge : edges) { + switch (edge.getAction()) { + case IGNORE: { + if (!computationState.isStartState()) { + final DeweyNumber version; + if (isEquivalentState(edge.getTargetState(), computationState.getState())) { + //Stay in the same state (it can be either looping one or singleton) + final int toIncrease = calculateIncreasingSelfState( + outgoingEdges.getTotalIgnoreBranches(), + outgoingEdges.getTotalTakeBranches()); + version = computationState.getVersion().increase(toIncrease); + } else { + //IGNORE after PROCEED + version = computationState.getVersion().increase(ignoreBranchesToVisit).addStage(); + ignoreBranchesToVisit--; + } - boolean branched = false; - while (!states.isEmpty()) { - State<T> currentState = states.pop(); - final List<StateTransition<T>> stateTransitions = new ArrayList<>(currentState.getStateTransitions()); + resultingComputationStates.add( + ComputationState.createState( + edge.getTargetState(), + computationState.getPreviousState(), + computationState.getEvent(), + computationState.getTimestamp(), + version, + computationState.getStartTimestamp() + ) + ); + sharedBuffer.lock( + computationState.getPreviousState().getName(), + computationState.getEvent(), + computationState.getTimestamp()); + } + } + break; + case TAKE: + final State<T> newState = edge.getTargetState(); + final State<T> consumingState = edge.getSourceState(); + final State<T> previousEventState = computationState.getPreviousState(); + + final T previousEvent = computationState.getEvent(); + final DeweyNumber currentVersion = computationState.getVersion(); + + final DeweyNumber newComputationStateVersion = new DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit); + takeBranchesToVisit--; + + final long startTimestamp; + if (computationState.isStartState()) { + startTimestamp = timestamp; + sharedBuffer.put( + consumingState.getName(), + event, + timestamp, + currentVersion); + } else { + startTimestamp = computationState.getStartTimestamp(); + sharedBuffer.put( + consumingState.getName(), + event, + timestamp, + previousEventState.getName(), + previousEvent, + computationState.getTimestamp(), + currentVersion); + } - // this is for when we restore from legacy. In that case, the comparator is null - // as it did not exist in the previous Flink versions, so we have to initialize it here. + // a new computation state is referring to the shared entry + sharedBuffer.lock(consumingState.getName(), event, timestamp); + + resultingComputationStates.add(ComputationState.createState( + newState, + consumingState, + event, + timestamp, + newComputationStateVersion, + startTimestamp + )); + + //check if newly created state is optional (have a PROCEED path to Final state) + final State<T> finalState = findFinalStateAfterProceed(newState, event); + if (finalState != null) { + sharedBuffer.lock(consumingState.getName(), event, timestamp); + resultingComputationStates.add(ComputationState.createState( + finalState, + consumingState, + event, + timestamp, + newComputationStateVersion, + startTimestamp)); + } + break; + } + } - if (stateTransitionComparator == null) { - stateTransitionComparator = new StateTransitionComparator(); + if (computationState.isStartState()) { + final int totalBranches = calculateIncreasingSelfState(outgoingEdges.getTotalIgnoreBranches(), outgoingEdges.getTotalTakeBranches()); + final ComputationState<T> startState = createStartState(computationState, totalBranches); + resultingComputationStates.add(startState); + } + + if (computationState.getEvent() != null) { + // release the shared entry referenced by the current computation state. + sharedBuffer.release( + computationState.getPreviousState().getName(), + computationState.getEvent(), + computationState.getTimestamp()); + // try to remove unnecessary shared buffer entries + sharedBuffer.remove( + computationState.getPreviousState().getName(), + computationState.getEvent(), + computationState.getTimestamp()); + } + + return resultingComputationStates; + } + + private State<T> findFinalStateAfterProceed(State<T> state, T event) { + final Stack<State<T>> statesToCheck = new Stack<>(); + statesToCheck.push(state); + + try { + while (!statesToCheck.isEmpty()) { + final State<T> currentState = statesToCheck.pop(); + for (StateTransition<T> transition : currentState.getStateTransitions()) { + if (transition.getAction() == StateTransitionAction.PROCEED && + checkFilterCondition(transition.getCondition(), event)) { + if (transition.getTargetState().isFinal()) { + return transition.getTargetState(); + } else { + statesToCheck.push(transition.getTargetState()); + } + } + } } + } catch (Exception e) { + throw new RuntimeException("Failure happened in filter function.", e); + } + + return null; + } + + private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) { + return takeBranches == 0 && ignoreBranches == 0 ? 0 : ignoreBranches + 1; + } + + private ComputationState<T> createStartState(final ComputationState<T> computationState, final int totalBranches) { + final DeweyNumber startVersion = computationState.getVersion().increase(totalBranches); + return ComputationState.createStartState(computationState.getState(), startVersion); + } - // impose the IGNORE will be processed last - Collections.sort(stateTransitions, stateTransitionComparator); + private OutgoingEdges<T> createDecisionGraph(ComputationState<T> computationState, T event) { + final Stack<State<T>> states = new Stack<>(); + states.push(computationState.getState()); + final OutgoingEdges<T> outgoingEdges = new OutgoingEdges<>(computationState.getState()); + //First create all outgoing edges, so to be able to reason about the Dewey version + while (!states.isEmpty()) { + State<T> currentState = states.pop(); + Collection<StateTransition<T>> stateTransitions = currentState.getStateTransitions(); // check all state transitions for each state - for (StateTransition<T> stateTransition: stateTransitions) { + for (StateTransition<T> stateTransition : stateTransitions) { try { - if (stateTransition.getCondition() == null || stateTransition.getCondition().filter(event)) { + if (checkFilterCondition(stateTransition.getCondition(), event)) { // filter condition is true switch (stateTransition.getAction()) { case PROCEED: @@ -310,73 +519,8 @@ public class NFA<T> implements Serializable { states.push(stateTransition.getTargetState()); break; case IGNORE: - final DeweyNumber version; - if (branched) { - version = computationState.getVersion().increase(); - } else { - version = computationState.getVersion(); - } - resultingComputationStates.add(new ComputationState<T>( - computationState.getState(), - computationState.getEvent(), - computationState.getTimestamp(), - version, - computationState.getStartTimestamp())); - - // we have a new computation state referring to the same the shared entry - // the lock of the current computation is released later on - sharedBuffer.lock(computationState.getState(), computationState.getEvent(), computationState.getTimestamp()); - break; case TAKE: - final State<T> newState = stateTransition.getTargetState(); - final DeweyNumber oldVersion; - final DeweyNumber newComputationStateVersion; - final State<T> previousState = computationState.getState(); - final T previousEvent = computationState.getEvent(); - final long previousTimestamp; - final long startTimestamp; - - if (computationState.isStartState()) { - oldVersion = new DeweyNumber(startEventCounter++); - newComputationStateVersion = oldVersion.addStage(); - startTimestamp = timestamp; - previousTimestamp = -1L; - - } else { - startTimestamp = computationState.getStartTimestamp(); - previousTimestamp = computationState.getTimestamp(); - oldVersion = computationState.getVersion(); - - branched = true; - newComputationStateVersion = oldVersion.addStage(); - } - - if (previousState.isStart()) { - sharedBuffer.put( - newState, - event, - timestamp, - oldVersion); - } else { - sharedBuffer.put( - newState, - event, - timestamp, - previousState, - previousEvent, - previousTimestamp, - oldVersion); - } - - // a new computation state is referring to the shared entry - sharedBuffer.lock(newState, event, timestamp); - - resultingComputationStates.add(new ComputationState<T>( - newState, - event, - timestamp, - newComputationStateVersion, - startTimestamp)); + outgoingEdges.add(stateTransition); break; } } @@ -385,19 +529,12 @@ public class NFA<T> implements Serializable { } } } + return outgoingEdges; + } - if (computationState.isStartState()) { - // a computation state is always kept if it refers to a starting state because every - // new element can start a new pattern - resultingComputationStates.add(computationState); - } else { - // release the shared entry referenced by the current computation state. - sharedBuffer.release(computationState.getState(), computationState.getEvent(), computationState.getTimestamp()); - // try to remove unnecessary shared buffer entries - sharedBuffer.remove(computationState.getState(), computationState.getEvent(), computationState.getTimestamp()); - } - return resultingComputationStates; + private boolean checkFilterCondition(FilterFunction<T> condition, T event) throws Exception { + return condition == null || condition.filter(event); } /** @@ -409,8 +546,8 @@ public class NFA<T> implements Serializable { * @return Collection of event sequences which end in the given computation state */ private Collection<Map<String, T>> extractPatternMatches(final ComputationState<T> computationState) { - Collection<LinkedHashMultimap<State<T>, T>> paths = sharedBuffer.extractPatterns( - computationState.getState(), + Collection<LinkedHashMultimap<String, T>> paths = sharedBuffer.extractPatterns( + computationState.getPreviousState().getName(), computationState.getEvent(), computationState.getTimestamp(), computationState.getVersion()); @@ -420,19 +557,20 @@ public class NFA<T> implements Serializable { TypeSerializer<T> serializer = nonDuplicatingTypeSerializer.getTypeSerializer(); // generate the correct names from the collection of LinkedHashMultimaps - for (LinkedHashMultimap<State<T>, T> path: paths) { + for (LinkedHashMultimap<String, T> path: paths) { Map<String, T> resultPath = new HashMap<>(); - for (State<T> key: path.keySet()) { + for (String key: path.keySet()) { int counter = 0; Set<T> events = path.get(key); // we iterate over the elements in insertion order for (T event: events) { resultPath.put( - events.size() > 1 ? generateStateName(key.getName(), counter): key.getName(), + events.size() > 1 ? generateStateName(key, counter): key, // copy the element so that the user can change it serializer.isImmutableType() ? event : serializer.copy(event) ); + counter++; } } @@ -472,6 +610,7 @@ public class NFA<T> implements Serializable { private void writeComputationState(final ComputationState<T> computationState, final ObjectOutputStream oos) throws IOException { oos.writeObject(computationState.getState()); + oos.writeObject(computationState.getPreviousState()); oos.writeLong(computationState.getTimestamp()); oos.writeObject(computationState.getVersion()); oos.writeLong(computationState.getStartTimestamp()); @@ -490,6 +629,7 @@ public class NFA<T> implements Serializable { @SuppressWarnings("unchecked") private ComputationState<T> readComputationState(ObjectInputStream ois) throws IOException, ClassNotFoundException { final State<T> state = (State<T>)ois.readObject(); + final State<T> previousState = (State<T>)ois.readObject(); final long timestamp = ois.readLong(); final DeweyNumber version = (DeweyNumber)ois.readObject(); final long startTimestamp = ois.readLong(); @@ -504,7 +644,7 @@ public class NFA<T> implements Serializable { event = null; } - return new ComputationState<>(state, event, timestamp, version, startTimestamp); + return ComputationState.createState(state, previousState, event, timestamp, version, startTimestamp); } /** @@ -629,20 +769,4 @@ public class NFA<T> implements Serializable { return getClass().hashCode(); } } - - /** - * Comparator used for imposing the assumption that IGNORE is always the last StateTransition in a state. - */ - private static final class StateTransitionComparator<T> implements Serializable, Comparator<StateTransition<T>> { - - private static final long serialVersionUID = -2775474935413622278L; - - @Override - public int compare(final StateTransition<T> o1, final StateTransition<T> o2) { - if (o1.getAction() == o2.getAction()) { - return 0; - } - return o1.getAction() == StateTransitionAction.IGNORE ? 1 : -1; - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java index b7e288b..e6a8c75 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java @@ -212,28 +212,27 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { SharedBufferEntry<K, V> entry = get(key, value, timestamp); if (entry != null) { - extractionStates.add(new ExtractionState<K, V>(entry, version, new Stack<SharedBufferEntry<K, V>>())); + extractionStates.add(new ExtractionState<>(entry, version, new Stack<SharedBufferEntry<K, V>>())); // use a depth first search to reconstruct the previous relations while (!extractionStates.isEmpty()) { - ExtractionState<K, V> extractionState = extractionStates.pop(); - DeweyNumber currentVersion = extractionState.getVersion(); + final ExtractionState<K, V> extractionState = extractionStates.pop(); // current path of the depth first search - Stack<SharedBufferEntry<K, V>> currentPath = extractionState.getPath(); + final Stack<SharedBufferEntry<K, V>> currentPath = extractionState.getPath(); + final SharedBufferEntry<K, V> currentEntry = extractionState.getEntry(); // termination criterion - if (currentVersion.length() == 1) { - LinkedHashMultimap<K, V> completePath = LinkedHashMultimap.create(); + if (currentEntry == null) { + final LinkedHashMultimap<K, V> completePath = LinkedHashMultimap.create(); while(!currentPath.isEmpty()) { - SharedBufferEntry<K, V> currentEntry = currentPath.pop(); + final SharedBufferEntry<K, V> currentPathEntry = currentPath.pop(); - completePath.put(currentEntry.getKey(), currentEntry.getValueTime().getValue()); + completePath.put(currentPathEntry.getKey(), currentPathEntry.getValueTime().getValue()); } result.add(completePath); } else { - SharedBufferEntry<K, V> currentEntry = extractionState.getEntry(); // append state to the path currentPath.push(currentEntry); @@ -242,17 +241,18 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { for (SharedBufferEdge<K, V> edge : currentEntry.getEdges()) { // we can only proceed if the current version is compatible to the version // of this previous relation + final DeweyNumber currentVersion = extractionState.getVersion(); if (currentVersion.isCompatibleWith(edge.getVersion())) { if (firstMatch) { // for the first match we don't have to copy the current path - extractionStates.push(new ExtractionState<K, V>(edge.getTarget(), edge.getVersion(), currentPath)); + extractionStates.push(new ExtractionState<>(edge.getTarget(), edge.getVersion(), currentPath)); firstMatch = false; } else { - Stack<SharedBufferEntry<K, V>> copy = new Stack<>(); + final Stack<SharedBufferEntry<K, V>> copy = new Stack<>(); copy.addAll(currentPath); extractionStates.push( - new ExtractionState<K, V>( + new ExtractionState<>( edge.getTarget(), edge.getVersion(), copy)); @@ -260,6 +260,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { } } } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java index 50b2cf3..7bcb6ea 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java @@ -18,6 +18,8 @@ package org.apache.flink.cep.nfa; +import org.apache.flink.api.common.functions.FilterFunction; + import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -43,7 +45,7 @@ public class State<T> implements Serializable { this.name = name; this.stateType = stateType; - stateTransitions = new ArrayList<StateTransition<T>>(); + stateTransitions = new ArrayList<>(); } public boolean isFinal() { @@ -60,8 +62,32 @@ public class State<T> implements Serializable { return stateTransitions; } - public void addStateTransition(final StateTransition<T> stateTransition) { - stateTransitions.add(stateTransition); + + private void addStateTransition( + final StateTransitionAction action, + final State<T> targetState, + final FilterFunction<T> condition) { + stateTransitions.add(new StateTransition<T>(this, action, targetState, condition)); + } + + public void addIgnore(final FilterFunction<T> condition) { + addStateTransition(StateTransitionAction.IGNORE, this, condition); + } + + public void addIgnore(final State<T> targetState,final FilterFunction<T> condition) { + addStateTransition(StateTransitionAction.IGNORE, targetState, condition); + } + + public void addTake(final State<T> targetState, final FilterFunction<T> condition) { + addStateTransition(StateTransitionAction.TAKE, targetState, condition); + } + + public void addProceed(final State<T> targetState, final FilterFunction<T> condition) { + addStateTransition(StateTransitionAction.PROCEED, targetState, condition); + } + + public void addTake(final FilterFunction<T> condition) { + addStateTransition(StateTransitionAction.TAKE, this, condition); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java index 479f28a..e3c7b7a 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java @@ -27,12 +27,18 @@ public class StateTransition<T> implements Serializable { private static final long serialVersionUID = -4825345749997891838L; private final StateTransitionAction action; + private final State<T> sourceState; private final State<T> targetState; private final FilterFunction<T> condition; - public StateTransition(final StateTransitionAction action, final State<T> targetState, final FilterFunction<T> condition) { + public StateTransition( + final State<T> sourceState, + final StateTransitionAction action, + final State<T> targetState, + final FilterFunction<T> condition) { this.action = action; this.targetState = targetState; + this.sourceState = sourceState; this.condition = condition; } @@ -44,6 +50,10 @@ public class StateTransition<T> implements Serializable { return targetState; } + public State<T> getSourceState() { + return sourceState; + } + public FilterFunction<T> getCondition() { return condition; } @@ -55,6 +65,7 @@ public class StateTransition<T> implements Serializable { StateTransition<T> other = (StateTransition<T>) obj; return action == other.action && + sourceState.getName().equals(other.sourceState.getName()) && targetState.getName().equals(other.targetState.getName()); } else { return false; @@ -64,14 +75,17 @@ public class StateTransition<T> implements Serializable { @Override public int hashCode() { // we have to take the name of targetState because the transition might be reflexive - return Objects.hash(action, targetState.getName()); + return Objects.hash(action, targetState.getName(), sourceState.getName()); } @Override public String toString() { StringBuilder builder = new StringBuilder(); - builder.append("StateTransition(").append(action).append(", ").append(targetState.getName()); + builder.append("StateTransition(") + .append(action).append(", ") + .append(sourceState.getName()).append(", ") + .append(targetState.getName()); if (condition != null) { builder.append(", with filter)"); http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java index 70fc7fb..b8ca4e8 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java @@ -22,7 +22,7 @@ package org.apache.flink.cep.nfa; * Set of actions when doing a state transition from a {@link State} to another. */ public enum StateTransitionAction { - TAKE, // take the current event and assign it to the new state - IGNORE, // ignore the current event and do the state transition + TAKE, // take the current event and assign it to the current state + IGNORE, // ignore the current event PROCEED // do the state transition and keep the current event for further processing (epsilon transition) } http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java deleted file mode 100644 index a3bb5f4..0000000 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cep.nfa.compiler; - -/** - * An exception used to indicate that a {@link org.apache.flink.cep.pattern.Pattern} - * was not specified correctly. - */ -public class MalformedPatternException extends RuntimeException { - - private static final long serialVersionUID = 7751134834983361543L; - - public MalformedPatternException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java index 18ed21f..b476c49 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java @@ -22,18 +22,21 @@ import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.State; -import org.apache.flink.cep.nfa.StateTransition; -import org.apache.flink.cep.nfa.StateTransitionAction; +import org.apache.flink.cep.pattern.FilterFunctions; import org.apache.flink.cep.pattern.FollowedByPattern; +import org.apache.flink.cep.pattern.MalformedPatternException; +import org.apache.flink.cep.pattern.NotFilterFunction; import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.Quantifier; +import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty; import org.apache.flink.streaming.api.windowing.time.Time; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; -import java.util.Map; +import java.util.List; import java.util.Set; /** @@ -42,7 +45,7 @@ import java.util.Set; */ public class NFACompiler { - protected final static String BEGINNING_STATE_NAME = "$beginningState$"; + protected static final String ENDING_STATE_NAME = "$endState$"; /** * Compiles the given pattern into a {@link NFA}. @@ -74,88 +77,288 @@ public class NFACompiler { */ @SuppressWarnings("unchecked") public static <T> NFAFactory<T> compileFactory( - Pattern<T, ?> pattern, - TypeSerializer<T> inputTypeSerializer, + final Pattern<T, ?> pattern, + final TypeSerializer<T> inputTypeSerializer, boolean timeoutHandling) { if (pattern == null) { // return a factory for empty NFAs - return new NFAFactoryImpl<T>(inputTypeSerializer, 0, Collections.<State<T>>emptyList(), timeoutHandling); + return new NFAFactoryImpl<>(inputTypeSerializer, 0, Collections.<State<T>>emptyList(), timeoutHandling); } else { - // set of all generated states - Map<String, State<T>> states = new HashMap<>(); - long windowTime; + final NFAFactoryCompiler<T> nfaFactoryCompiler = new NFAFactoryCompiler<>(pattern); + nfaFactoryCompiler.compileFactory(); + return new NFAFactoryImpl<>(inputTypeSerializer, nfaFactoryCompiler.getWindowTime(), nfaFactoryCompiler.getStates(), timeoutHandling); + } + } + + /** + * Converts a {@link Pattern} into graph of {@link State}. It enables sharing of + * compilation state across methods. + * + * @param <T> + */ + private static class NFAFactoryCompiler<T> { + + private final Set<String> usedNames = new HashSet<>(); + private final List<State<T>> states = new ArrayList<>(); - // this is used to enforse pattern name uniqueness. - Set<String> patternNames = new HashSet<>(); + private long windowTime = 0; + private Pattern<T, ?> currentPattern; - Pattern<T, ?> succeedingPattern; - State<T> succeedingState; - Pattern<T, ?> currentPattern = pattern; + NFAFactoryCompiler(final Pattern<T, ?> pattern) { + this.currentPattern = pattern; + } + /** + * Compiles the given pattern into a {@link NFAFactory}. The NFA factory can be used to create + * multiple NFAs. + */ + void compileFactory() { // we're traversing the pattern from the end to the beginning --> the first state is the final state - State<T> currentState = new State<>(currentPattern.getName(), State.StateType.Final); - patternNames.add(currentPattern.getName()); + State<T> sinkState = createEndingState(); + // add all the normal states + sinkState = createMiddleStates(sinkState); + // add the beginning state + createStartState(sinkState); + } - states.put(currentPattern.getName(), currentState); + List<State<T>> getStates() { + return states; + } + + long getWindowTime() { + return windowTime; + } + + /** + * Creates the dummy Final {@link State} of the NFA graph. + * @return dummy Final state + */ + private State<T> createEndingState() { + State<T> endState = new State<>(ENDING_STATE_NAME, State.StateType.Final); + states.add(endState); + usedNames.add(ENDING_STATE_NAME); windowTime = currentPattern.getWindowTime() != null ? currentPattern.getWindowTime().toMilliseconds() : 0L; + return endState; + } - while (currentPattern.getPrevious() != null) { - succeedingPattern = currentPattern; - succeedingState = currentState; - currentPattern = currentPattern.getPrevious(); + /** + * Creates all the states between Start and Final state. + * @param sinkState the state that last state should point to (always the Final state) + * @return the next state after Start in the resulting graph + */ + private State<T> createMiddleStates(final State<T> sinkState) { - if (!patternNames.add(currentPattern.getName())) { - throw new MalformedPatternException("Duplicate pattern name: " + currentPattern.getName() + ". " + - "Pattern names must be unique."); + State<T> lastSink = sinkState; + while (currentPattern.getPrevious() != null) { + checkPatternNameUniqueness(); + + State<T> sourceState = new State<>(currentPattern.getName(), State.StateType.Normal); + states.add(sourceState); + usedNames.add(sourceState.getName()); + + if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) { + convertToLooping(sourceState, lastSink); + + if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) { + sourceState = createFirstMandatoryStateOfLoop(sourceState, State.StateType.Normal); + states.add(sourceState); + usedNames.add(sourceState.getName()); + } + } else if (currentPattern.getQuantifier() == Quantifier.TIMES) { + sourceState = convertToTimesState(sourceState, lastSink, currentPattern.getTimes()); + } else { + convertToSingletonState(sourceState, lastSink); } - Time currentWindowTime = currentPattern.getWindowTime(); + currentPattern = currentPattern.getPrevious(); + lastSink = sourceState; + final Time currentWindowTime = currentPattern.getWindowTime(); if (currentWindowTime != null && currentWindowTime.toMilliseconds() < windowTime) { // the window time is the global minimum of all window times of each state windowTime = currentWindowTime.toMilliseconds(); } + } + + return lastSink; + } + + private void checkPatternNameUniqueness() { + if (usedNames.contains(currentPattern.getName())) { + throw new MalformedPatternException( + "Duplicate pattern name: " + currentPattern.getName() + ". " + + "Pattern names must be unique."); + } + } + + /** + * Creates the Start {@link State} of the resulting NFA graph. + * @param sinkState the state that Start state should point to (alwyas first state of middle states) + * @return created state + */ + @SuppressWarnings("unchecked") + private State<T> createStartState(State<T> sinkState) { + checkPatternNameUniqueness(); + + final State<T> beginningState; + if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) { + final State<T> loopingState; + if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) { + loopingState = new State<>(currentPattern.getName(), State.StateType.Normal); + beginningState = createFirstMandatoryStateOfLoop(loopingState, State.StateType.Start); + states.add(loopingState); + } else { + loopingState = new State<>(currentPattern.getName(), State.StateType.Start); + beginningState = loopingState; + } + convertToLooping(loopingState, sinkState, true); + } else { + if (currentPattern.getQuantifier() == Quantifier.TIMES && currentPattern.getTimes() > 1) { + final State<T> timesState = new State<>(currentPattern.getName(), State.StateType.Normal); + states.add(timesState); + sinkState = convertToTimesState(timesState, sinkState, currentPattern.getTimes() - 1); + } - if (states.containsKey(currentPattern.getName())) { - currentState = states.get(currentPattern.getName()); + beginningState = new State<>(currentPattern.getName(), State.StateType.Start); + convertToSingletonState(beginningState, sinkState); + } + + states.add(beginningState); + usedNames.add(beginningState.getName()); + + return beginningState; + } + + /** + * Converts the given state into a "complex" state consisting of given number of states with + * same {@link FilterFunction} + * + * @param sourceState the state to be converted + * @param sinkState the state that the converted state should point to + * @param times number of times the state should be copied + * @return the first state of the "complex" state, next state should point to it + */ + private State<T> convertToTimesState(final State<T> sourceState, final State<T> sinkState, int times) { + convertToSingletonState(sourceState, sinkState); + State<T> lastSink; + State<T> firstState = sourceState; + for (int i = 0; i < times - 1; i++) { + lastSink = firstState; + firstState = new State<>(currentPattern.getName(), State.StateType.Normal); + states.add(firstState); + convertToSingletonState(firstState, lastSink); + } + return firstState; + } + + /** + * Converts the given state into a simple single state. For an OPTIONAL state it also consists + * of a similar state without the PROCEED edge, so that for each PROCEED transition branches + * in computation state graph can be created only once. + * + * @param sourceState the state to be converted + * @param sinkState state that the state being converted should point to + */ + @SuppressWarnings("unchecked") + private void convertToSingletonState(final State<T> sourceState, final State<T> sinkState) { + + final FilterFunction<T> currentFilterFunction = (FilterFunction<T>) currentPattern.getFilterFunction(); + final FilterFunction<T> trueFunction = FilterFunctions.trueFunction(); + sourceState.addTake(sinkState, currentFilterFunction); + + if (currentPattern.getQuantifier() == Quantifier.OPTIONAL) { + sourceState.addProceed(sinkState, trueFunction); + } + + if (currentPattern instanceof FollowedByPattern) { + final State<T> ignoreState; + if (currentPattern.getQuantifier() == Quantifier.OPTIONAL) { + ignoreState = new State<>(currentPattern.getName(), State.StateType.Normal); + ignoreState.addTake(sinkState, currentFilterFunction); + states.add(ignoreState); } else { - currentState = new State<>(currentPattern.getName(), State.StateType.Normal); - states.put(currentState.getName(), currentState); + ignoreState = sourceState; } + sourceState.addIgnore(ignoreState, trueFunction); + } + } - currentState.addStateTransition(new StateTransition<T>( - StateTransitionAction.TAKE, - succeedingState, - (FilterFunction<T>) succeedingPattern.getFilterFunction())); - - if (succeedingPattern instanceof FollowedByPattern) { - // the followed by pattern entails a reflexive ignore transition - currentState.addStateTransition(new StateTransition<T>( - StateTransitionAction.IGNORE, - currentState, - null - )); + /** + * Patterns with quantifiers AT_LEAST_ONE_* are converted into pair of states: a singleton state and + * looping state. This method creates the first of the two. + * + * @param sinkState the state the newly created state should point to, it should be a looping state + * @param stateType the type of the created state, as the NFA graph can also start wit AT_LEAST_ONE_* + * @return the newly created state + */ + @SuppressWarnings("unchecked") + private State<T> createFirstMandatoryStateOfLoop(final State<T> sinkState, final State.StateType stateType) { + + final FilterFunction<T> currentFilterFunction = (FilterFunction<T>) currentPattern.getFilterFunction(); + final State<T> firstState = new State<>(currentPattern.getName(), stateType); + + firstState.addTake(sinkState, currentFilterFunction); + if (currentPattern instanceof FollowedByPattern) { + if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.EAGER)) { + firstState.addIgnore(new NotFilterFunction<>(currentFilterFunction)); + } else { + firstState.addIgnore(FilterFunctions.<T>trueFunction()); } } + return firstState; + } - // add the beginning state - final State<T> beginningState; + /** + * Converts the given state into looping one. Looping state is one with TAKE edge to itself and + * PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that + * for each PROCEED transition branches in computation state graph can be created only once. + * + * <p>If this looping state is first of a graph we should treat the {@link Pattern} as {@link FollowedByPattern} + * to enable combinations. + * + * @param sourceState the state to converted + * @param sinkState the state that the converted state should point to + * @param isFirstState if the looping state is first of a graph + */ + @SuppressWarnings("unchecked") + private void convertToLooping(final State<T> sourceState, final State<T> sinkState, boolean isFirstState) { + + final FilterFunction<T> filterFunction = (FilterFunction<T>) currentPattern.getFilterFunction(); + final FilterFunction<T> trueFunction = FilterFunctions.<T>trueFunction(); + + sourceState.addProceed(sinkState, trueFunction); + sourceState.addTake(filterFunction); + if (currentPattern instanceof FollowedByPattern || isFirstState) { + final State<T> ignoreState = new State<>( + currentPattern.getName(), + State.StateType.Normal); + + + final FilterFunction<T> ignoreCondition; + if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.EAGER)) { + ignoreCondition = new NotFilterFunction<>(filterFunction); + } else { + ignoreCondition = trueFunction; + } - if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - } else { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); + sourceState.addIgnore(ignoreState, ignoreCondition); + ignoreState.addTake(sourceState, filterFunction); + ignoreState.addIgnore(ignoreState, ignoreCondition); + states.add(ignoreState); } + } - beginningState.addStateTransition(new StateTransition<T>( - StateTransitionAction.TAKE, - currentState, - (FilterFunction<T>) currentPattern.getFilterFunction() - )); - - return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + /** + * Converts the given state into looping one. Looping state is one with TAKE edge to itself and + * PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that + * for each PROCEED transition branches in computation state graph can be created only once. + * + * @param sourceState the state to converted + * @param sinkState the state that the converted state should point to + */ + private void convertToLooping(final State<T> sourceState, final State<T> sinkState) { + convertToLooping(sourceState, sinkState, false); } } http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java new file mode 100644 index 0000000..12e58ba --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.pattern; + +import org.apache.flink.api.common.functions.FilterFunction; + +public class FilterFunctions<T> { + + private FilterFunctions() { + } + + public static <T> FilterFunction<T> trueFunction() { + return new FilterFunction<T>() { + @Override + public boolean filter(T value) throws Exception { + return true; + } + }; + } + + public static <T> FilterFunction<T> falseFunction() { + return new FilterFunction<T>() { + @Override + public boolean filter(T value) throws Exception { + return false; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java new file mode 100644 index 0000000..c85f3be --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern; + +/** + * An exception used to indicate that a {@link org.apache.flink.cep.pattern.Pattern} + * was not specified correctly. + */ +public class MalformedPatternException extends RuntimeException { + + private static final long serialVersionUID = 7751134834983361543L; + + public MalformedPatternException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java new file mode 100644 index 0000000..a4fc8f5 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern; + +import org.apache.flink.api.common.functions.FilterFunction; + +/** + * A filter function which negates filter function. + * + * @param <T> Type of the element to filter + */ +public class NotFilterFunction<T> implements FilterFunction<T> { + private static final long serialVersionUID = -2109562093871155005L; + + private final FilterFunction<T> original; + + public NotFilterFunction(final FilterFunction<T> original) { + this.original = original; + } + + @Override + public boolean filter(T value) throws Exception { + return !original.filter(value); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java index 7ea675f..7b4d9c7 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.Preconditions; /** * Base class for a pattern definition. @@ -53,6 +54,10 @@ public class Pattern<T, F extends T> { // window length in which the pattern match has to occur private Time windowTime; + private Quantifier quantifier = Quantifier.ONE; + + private int times; + protected Pattern(final String name, final Pattern<T, ? extends T> previous) { this.name = name; this.previous = previous; @@ -74,6 +79,14 @@ public class Pattern<T, F extends T> { return windowTime; } + public Quantifier getQuantifier() { + return quantifier; + } + + public int getTimes() { + return times; + } + /** * Specifies a filter condition which has to be fulfilled by an event in order to be matched. * @@ -183,4 +196,106 @@ public class Pattern<T, F extends T> { return new Pattern<X, X>(name, null); } + /** + * Specifies that this pattern can occur zero or more times(kleene star). + * This means any number of events can be matched in this state. + * + * @return The same pattern with applied Kleene star operator + * + * @throws MalformedPatternException if quantifier already applied + */ + public Pattern<T, F> zeroOrMore() { + return zeroOrMore(true); + } + + /** + * Specifies that this pattern can occur zero or more times(kleene star). + * This means any number of events can be matched in this state. + * + * If eagerness is enabled for a pattern A*B and sequence A1 A2 B will generate patterns: + * B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B. + * + * @param eager if true the pattern always consumes earlier events + * @return The same pattern with applied Kleene star operator + * + * @throws MalformedPatternException if quantifier already applied + */ + public Pattern<T, F> zeroOrMore(final boolean eager) { + checkIfQuantifierApplied(); + if (eager) { + this.quantifier = Quantifier.ZERO_OR_MORE_EAGER; + } else { + this.quantifier = Quantifier.ZERO_OR_MORE_COMBINATIONS; + } + return this; + } + + /** + * Specifies that this pattern can occur one or more times(kleene star). + * This means at least one and at most infinite number of events can be matched in this state. + * + * @return The same pattern with applied Kleene plus operator + * + * @throws MalformedPatternException if quantifier already applied + */ + public Pattern<T, F> oneOrMore() { + return oneOrMore(true); + } + + /** + * Specifies that this pattern can occur one or more times(kleene star). + * This means at least one and at most infinite number of events can be matched in this state. + * + * If eagerness is enabled for a pattern A+B and sequence A1 A2 B will generate patterns: + * A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B. + * + * @param eager if true the pattern always consumes earlier events + * @return The same pattern with applied Kleene plus operator + * + * @throws MalformedPatternException if quantifier already applied + */ + public Pattern<T, F> oneOrMore(final boolean eager) { + checkIfQuantifierApplied(); + if (eager) { + this.quantifier = Quantifier.ONE_OR_MORE_EAGER; + } else { + this.quantifier = Quantifier.ONE_OR_MORE_COMBINATIONS; + } + return this; + } + + /** + * Specifies that this pattern can occur zero or once. + * + * @return The same pattern with applied Kleene ? operator + * + * @throws MalformedPatternException if quantifier already applied + */ + public Pattern<T, F> optional() { + checkIfQuantifierApplied(); + this.quantifier = Quantifier.OPTIONAL; + return this; + } + + /** + * Specifies exact number of times that this pattern should be matched. + * + * @param times number of times matching event must appear + * @return The same pattern with number of times applied + * + * @throws MalformedPatternException if quantifier already applied + */ + public Pattern<T, F> times(int times) { + checkIfQuantifierApplied(); + Preconditions.checkArgument(times > 0, "You should give a positive number greater than 0."); + this.quantifier = Quantifier.TIMES; + this.times = times; + return this; + } + + private void checkIfQuantifierApplied() { + if (this.quantifier != Quantifier.ONE) { + throw new MalformedPatternException("Already applied quantifier to this Pattern. Current quantifier is: " + this.quantifier); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java new file mode 100644 index 0000000..7abe9bd --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.pattern; + +import java.util.EnumSet; + +public enum Quantifier { + ONE, + ZERO_OR_MORE_EAGER(QuantifierProperty.LOOPING, QuantifierProperty.EAGER), + ZERO_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING), + ONE_OR_MORE_EAGER( + QuantifierProperty.LOOPING, + QuantifierProperty.EAGER, + QuantifierProperty.AT_LEAST_ONE), + ONE_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING, QuantifierProperty.AT_LEAST_ONE), + TIMES, + OPTIONAL; + + private final EnumSet<QuantifierProperty> properties; + + Quantifier(final QuantifierProperty first, final QuantifierProperty... rest) { + this.properties = EnumSet.of(first, rest); + } + + Quantifier() { + this.properties = EnumSet.noneOf(QuantifierProperty.class); + } + + public boolean hasProperty(QuantifierProperty property) { + return properties.contains(property); + } + + public enum QuantifierProperty { + LOOPING, + EAGER, + AT_LEAST_ONE + } + +}