[FLINK-6208] [cep] Implement skip till next match strategy
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c35dc0e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c35dc0e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c35dc0e Branch: refs/heads/master Commit: 7c35dc0e40a96f6b8d367b9a80a19e9a22525043 Parents: b527582 Author: Dawid Wysakowicz <da...@getindata.com> Authored: Wed Apr 26 11:57:53 2017 +0200 Committer: kl0u <kklou...@gmail.com> Committed: Fri Apr 28 14:28:37 2017 +0200 ---------------------------------------------------------------------- docs/dev/libs/cep.md | 19 +- .../cep/scala/pattern/FollowedByPattern.scala | 44 -- .../flink/cep/scala/pattern/Pattern.scala | 31 +- .../flink/cep/scala/pattern/package.scala | 3 +- .../flink/cep/scala/pattern/PatternTest.scala | 52 +- .../flink/cep/nfa/compiler/NFACompiler.java | 192 ++--- .../flink/cep/pattern/FollowedByPattern.java | 33 - .../org/apache/flink/cep/pattern/Pattern.java | 42 +- .../apache/flink/cep/pattern/Quantifier.java | 82 +- .../java/org/apache/flink/cep/CEPITCase.java | 30 +- .../org/apache/flink/cep/nfa/NFAITCase.java | 760 +++++++++---------- .../flink/cep/nfa/compiler/NFACompilerTest.java | 39 - .../flink/cep/operator/CEPOperatorTest.java | 5 +- .../apache/flink/cep/pattern/PatternTest.java | 9 +- 14 files changed, 656 insertions(+), 685 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/docs/dev/libs/cep.md ---------------------------------------------------------------------- diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md index 15afdf5..b379615 100644 --- a/docs/dev/libs/cep.md +++ b/docs/dev/libs/cep.md @@ -268,7 +268,7 @@ val strictNext: Pattern[Event, _] = start.next("middle") </div> Non-strict contiguity means that other events are allowed to occur in-between two matching events. -A non-strict contiguity pattern state can be created via the `followedBy` method. +A non-strict contiguity pattern state can be created via the `followedBy` or `followedByAny` method. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> @@ -282,6 +282,23 @@ Pattern<Event, ?> nonStrictNext = start.followedBy("middle"); val nonStrictNext : Pattern[Event, _] = start.followedBy("middle") {% endhighlight %} </div> + +For non-strict contiguity one can specify if only the first succeeding matching event will be matched, or +all. In the latter case multiple matches will be emitted for the same beginning. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +Pattern<Event, ?> nonStrictNext = start.followedByAny("middle"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val nonStrictNext : Pattern[Event, _] = start.followedByAny("middle") +{% endhighlight %} +</div> + </div> It is also possible to define a temporal constraint for the pattern to be valid. For example, one can define that a pattern should occur within 10 seconds via the `within` method. http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala deleted file mode 100644 index 4bda08f..0000000 --- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.cep.scala.pattern - -import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern} - -object FollowedByPattern { - /** - * Constructs a new Pattern by wrapping a given Java API Pattern - * - * @param jfbPattern Underlying Java API Pattern. - * @tparam T Base type of the elements appearing in the pattern - * @tparam F Subtype of T to which the current pattern operator is constrained - * @return New wrapping FollowedByPattern object - */ - def apply[T, F <: T](jfbPattern: JFollowedByPattern[T, F]) = - new FollowedByPattern[T, F](jfbPattern) -} - -/** - * Pattern operator which signifies that the there is a non-strict temporal contiguity between - * itself and its preceding pattern operator. This means that there might be events in between - * two matching events. These events are then simply ignored. - * - * @tparam T Base type of the events - * @tparam F Subtype of T to which the operator is currently constrained - */ -class FollowedByPattern[T, F <: T](jfbPattern: JFollowedByPattern[T, F]) - extends Pattern[T, F](jfbPattern) http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala index 3bdbfcf..65b7ab0 100644 --- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala @@ -46,22 +46,22 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) { /** * @return The previous pattern */ - def getPrevious(): Option[Pattern[T, _ <: T]] = { - wrapPattern(jPattern.getPrevious()) + def getPrevious: Option[Pattern[T, _ <: T]] = { + wrapPattern(jPattern.getPrevious) } /** * * @return Name of the pattern operator */ - def getName(): String = jPattern.getName() + def getName: String = jPattern.getName /** * * @return Window length in which the pattern match has to occur */ - def getWindowTime(): Option[Time] = { - Option(jPattern.getWindowTime()) + def getWindowTime: Option[Time] = { + Option(jPattern.getWindowTime) } /** @@ -70,8 +70,8 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) { */ def getQuantifier: Quantifier = jPattern.getQuantifier - def getCondition(): Option[IterativeCondition[F]] = { - Option(jPattern.getCondition()) + def getCondition: Option[IterativeCondition[F]] = { + Option(jPattern.getCondition) } /** @@ -208,7 +208,22 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) { * @param name Name of the new pattern * @return A new pattern which is appended to this one */ - def followedBy(name: String) = FollowedByPattern(jPattern.followedBy(name)) + def followedBy(name: String): Pattern[T, T] = { + Pattern[T, T](jPattern.followedBy(name)) + } + + /** + * Appends a new pattern to the existing one. The new pattern enforces non-strict + * temporal contiguity. This means that a matching event of this pattern and the + * preceding matching event might be interleaved with other events which are ignored. + * + * @param name Name of the new pattern + * @return A new pattern which is appended to this one + */ + def followedByAny(name: String): Pattern[T, T] = { + Pattern[T, T](jPattern.followedByAny(name)) + } + /** * Specifies that this pattern is optional for a final match of the pattern http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala index 382c160..26355a5 100644 --- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala @@ -17,7 +17,7 @@ */ package org.apache.flink.cep.scala -import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern, Pattern => JPattern} +import org.apache.flink.cep.pattern.{Pattern => JPattern} package object pattern { /** @@ -31,7 +31,6 @@ package object pattern { */ private[flink] def wrapPattern[T, F <: T](javaPattern: JPattern[T, F]) : Option[Pattern[T, F]] = javaPattern match { - case f: JFollowedByPattern[T, F] => Some(FollowedByPattern[T, F](f)) case p: JPattern[T, F] => Some(Pattern[T, F](p)) case _ => None } http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala index a95dddd..d574513 100644 --- a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala +++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala @@ -22,7 +22,7 @@ import org.junit.Assert._ import org.junit.Test import org.apache.flink.cep.Event import org.apache.flink.cep.SubEvent -import org.apache.flink.cep.pattern.conditions.IterativeCondition.Context +import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy import org.apache.flink.cep.pattern.conditions._ class PatternTest { @@ -33,7 +33,7 @@ class PatternTest { */ @Test - def testStrictContiguity: Unit = { + def testStrictContiguity(): Unit = { val pattern = Pattern.begin[Event]("start").next("next").next("end") val jPattern = JPattern.begin[Event]("start").next("next").next("end") @@ -56,7 +56,7 @@ class PatternTest { @Test - def testNonStrictContiguity: Unit = { + def testNonStrictContiguity(): Unit = { val pattern = Pattern.begin[Event]("start").followedBy("next").followedBy("end") val jPattern = JPattern.begin[Event]("start").followedBy("next").followedBy("end") @@ -69,8 +69,8 @@ class PatternTest { assertTrue(previous.getPrevious.isDefined) assertFalse(preprevious.getPrevious.isDefined) - assertTrue(pattern.isInstanceOf[FollowedByPattern[_, _]]) - assertTrue(previous.isInstanceOf[FollowedByPattern[_, _]]) + assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, pattern.getQuantifier.getConsumingStrategy) + assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, previous.getQuantifier.getConsumingStrategy) assertEquals(pattern.getName, "end") assertEquals(previous.getName, "next") @@ -78,25 +78,25 @@ class PatternTest { } @Test - def testStrictContiguityWithCondition: Unit = { + def testStrictContiguityWithCondition(): Unit = { val pattern = Pattern.begin[Event]("start") .next("next") - .where((value: Event, ctx: Context[Event]) => value.getName() == "foobar") + .where((value: Event, _) => value.getName == "foobar") .next("end") - .where((value: Event, ctx: Context[Event]) => value.getId() == 42) + .where((value: Event, _) => value.getId == 42) val jPattern = JPattern.begin[Event]("start") .next("next") .where(new SimpleCondition[Event]() { @throws[Exception] def filter(value: Event): Boolean = { - return value.getName() == "foobar" + value.getName == "foobar" } }).next("end") .where(new SimpleCondition[Event]() { @throws[Exception] def filter(value: Event): Boolean = { - return value.getId() == 42 + value.getId == 42 } }) @@ -120,7 +120,7 @@ class PatternTest { } @Test - def testPatternWithSubtyping: Unit = { + def testPatternWithSubtyping(): Unit = { val pattern = Pattern.begin[Event]("start") .next("subevent") .subtype(classOf[SubEvent]) @@ -150,11 +150,11 @@ class PatternTest { } @Test - def testPatternWithSubtypingAndFilter: Unit = { + def testPatternWithSubtypingAndFilter(): Unit = { val pattern = Pattern.begin[Event]("start") .next("subevent") .subtype(classOf[SubEvent]) - .where((value: SubEvent) => false) + .where(_ => false) .followedBy("end") val jpattern = JPattern.begin[Event]("start") @@ -163,7 +163,7 @@ class PatternTest { .where(new SimpleCondition[SubEvent]() { @throws[Exception] def filter(value: SubEvent): Boolean = { - return false + false } }).followedBy("end") @@ -178,8 +178,8 @@ class PatternTest { assertTrue(previous.getPrevious.isDefined) assertFalse(preprevious.getPrevious.isDefined) - assertTrue(pattern.isInstanceOf[FollowedByPattern[_, _]]) - assertTrue(previous.getCondition().isDefined) + assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, pattern.getQuantifier.getConsumingStrategy) + assertTrue(previous.getCondition.isDefined) assertEquals(pattern.getName, "end") assertEquals(previous.getName, "subevent") @@ -194,25 +194,25 @@ class PatternTest { && threeWayEquals( pattern.getName, pattern.wrappedPattern.getName, - jPattern.getName()) + jPattern.getName) //check equal time windows && threeWayEquals( pattern.getWindowTime.orNull, pattern.wrappedPattern.getWindowTime, - jPattern.getWindowTime()) + jPattern.getWindowTime) //check congruent class names / types && threeWayEquals( pattern.getClass.getSimpleName, pattern.wrappedPattern.getClass.getSimpleName, - jPattern.getClass().getSimpleName()) + jPattern.getClass.getSimpleName) //best effort to confirm congruent filter functions && compareFilterFunctions( - pattern.getCondition().orNull, - jPattern.getCondition()) + pattern.getCondition.orNull, + jPattern.getCondition) //recursively check previous patterns && checkCongruentRepresentations( pattern.getPrevious.orNull, - jPattern.getPrevious())) + jPattern.getPrevious)) } def threeWayEquals(a: AnyRef, b: AnyRef, c: AnyRef): Boolean = { @@ -233,15 +233,15 @@ class PatternTest { (sFilter, jFilter) match { //matching types: and-filter; branch and recurse for inner filters case (saf: AndCondition[_], jaf: AndCondition[_]) - => (compareFilterFunctions(saf.getLeft(), jaf.getLeft()) - && compareFilterFunctions(saf.getRight(), jaf.getRight())) + => (compareFilterFunctions(saf.getLeft, jaf.getLeft) + && compareFilterFunctions(saf.getRight, jaf.getRight)) //matching types: subtype-filter - case (saf: SubtypeCondition[_], jaf: SubtypeCondition[_]) => true + case (_: SubtypeCondition[_], _: SubtypeCondition[_]) => true //mismatch: one-sided and/subtype-filter case (_: AndCondition[_] | _: SubtypeCondition[_], _) => false case (_, _: AndCondition[_] | _: SubtypeCondition[_]) => false //from here we can only check mutual presence or absence of a function - case (s: IterativeCondition[_], j: IterativeCondition[_]) => true + case (_: IterativeCondition[_], _: IterativeCondition[_]) => true case (null, null) => true case _ => false } http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java index b4e0557..0ca0e14 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java @@ -20,32 +20,30 @@ package org.apache.flink.cep.nfa.compiler; import com.google.common.base.Predicate; import com.google.common.collect.Iterators; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.State; import org.apache.flink.cep.nfa.StateTransition; import org.apache.flink.cep.nfa.StateTransitionAction; -import org.apache.flink.cep.pattern.conditions.BooleanConditions; -import org.apache.flink.cep.pattern.FollowedByPattern; import org.apache.flink.cep.pattern.MalformedPatternException; -import org.apache.flink.cep.pattern.conditions.NotCondition; import org.apache.flink.cep.pattern.Pattern; -import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty; +import org.apache.flink.cep.pattern.Quantifier; +import org.apache.flink.cep.pattern.conditions.BooleanConditions; import org.apache.flink.cep.pattern.conditions.IterativeCondition; +import org.apache.flink.cep.pattern.conditions.NotCondition; import org.apache.flink.streaming.api.windowing.time.Time; -import javax.annotation.Nullable; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - /** * Compiler class containing methods to compile a {@link Pattern} into a {@link NFA} or a * {@link NFAFactory}. @@ -159,25 +157,7 @@ public class NFACompiler { State<T> lastSink = sinkState; while (currentPattern.getPrevious() != null) { - checkPatternNameUniqueness(); - usedNames.add(currentPattern.getName()); - - if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) { - final State<T> looping = createLooping(lastSink); - - if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL)) { - lastSink = createFirstMandatoryStateOfLoop(looping); - } else if (currentPattern instanceof FollowedByPattern && - currentPattern.getQuantifier().hasProperty(QuantifierProperty.CONSECUTIVE)) { - lastSink = createWaitingStateForZeroOrMore(looping, lastSink); - } else { - lastSink = looping; - } - } else if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.TIMES)) { - lastSink = createTimesState(lastSink, currentPattern.getTimes()); - } else { - lastSink = createSingletonState(lastSink); - } + lastSink = convertPattern(lastSink); currentPattern = currentPattern.getPrevious(); final Time currentWindowTime = currentPattern.getWindowTime(); @@ -190,6 +170,29 @@ public class NFACompiler { return lastSink; } + private State<T> convertPattern(final State<T> sinkState) { + final State<T> lastSink; + checkPatternNameUniqueness(); + usedNames.add(currentPattern.getName()); + + final Quantifier quantifier = currentPattern.getQuantifier(); + if (quantifier.hasProperty(Quantifier.QuantifierProperty.LOOPING)) { + final State<T> looping = createLooping(sinkState); + + if (!quantifier.hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) { + lastSink = createFirstMandatoryStateOfLoop(looping); + } else { + lastSink = createWaitingStateForZeroOrMore(looping, sinkState); + } + } else if (quantifier.hasProperty(Quantifier.QuantifierProperty.TIMES)) { + lastSink = createTimesState(sinkState, currentPattern.getTimes()); + } else { + lastSink = createSingletonState(sinkState); + } + + return lastSink; + } + /** * Creates a pair of states that enables relaxed strictness before a zeroOrMore looping state. * @@ -197,19 +200,21 @@ public class NFACompiler { * @param lastSink the state that the looping one points to * @return the newly created state */ + @SuppressWarnings("unchecked") private State<T> createWaitingStateForZeroOrMore(final State<T> loopingState, final State<T> lastSink) { - final State<T> followByState = createNormalState(); - final State<T> followByStateWithoutProceed = createNormalState(); - final IterativeCondition<T> currentFunction = (IterativeCondition<T>)currentPattern.getCondition(); - final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern); + final State<T> followByState = createNormalState(); followByState.addProceed(lastSink, BooleanConditions.<T>trueFunction()); - followByState.addIgnore(followByStateWithoutProceed, ignoreFunction); followByState.addTake(loopingState, currentFunction); - followByStateWithoutProceed.addIgnore(ignoreFunction); - followByStateWithoutProceed.addTake(loopingState, currentFunction); + final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern); + if (ignoreFunction != null) { + final State<T> followByStateWithoutProceed = createNormalState(); + followByState.addIgnore(followByStateWithoutProceed, ignoreFunction); + followByStateWithoutProceed.addIgnore(ignoreFunction); + followByStateWithoutProceed.addTake(loopingState, currentFunction); + } return followByState; } @@ -230,23 +235,7 @@ public class NFACompiler { */ @SuppressWarnings("unchecked") private State<T> createStartState(State<T> sinkState) { - checkPatternNameUniqueness(); - usedNames.add(currentPattern.getName()); - - final State<T> beginningState; - if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) { - final State<T> loopingState = createLooping(sinkState); - if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL)) { - beginningState = createFirstMandatoryStateOfLoop(loopingState); - } else { - beginningState = loopingState; - } - } else if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.TIMES)) { - beginningState = createTimesState(sinkState, currentPattern.getTimes()); - } else { - beginningState = createSingletonState(sinkState); - } - + final State<T> beginningState = convertPattern(sinkState); beginningState.makeStart(); return beginningState; @@ -263,29 +252,26 @@ public class NFACompiler { private State<T> createTimesState(final State<T> sinkState, int times) { State<T> lastSink = sinkState; for (int i = 0; i < times - 1; i++) { - lastSink = createSingletonState( - lastSink, - !currentPattern.getQuantifier().hasProperty(QuantifierProperty.CONSECUTIVE), - false); + lastSink = createSingletonState(lastSink, getInnerIgnoreCondition(currentPattern), false); } + final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition(); + final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern); + // we created the intermediate states in the loop, now we create the start of the loop. - if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL)) { - return createSingletonState(lastSink, currentPattern instanceof FollowedByPattern, false); + if (!currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) { + return createSingletonState(lastSink, ignoreCondition, false); } - final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition(); - final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction(); - final State<T> singletonState = createNormalState(); singletonState.addTake(lastSink, currentFilterFunction); - singletonState.addProceed(sinkState, trueFunction); + singletonState.addProceed(sinkState, BooleanConditions.<T>trueFunction()); - if (currentPattern instanceof FollowedByPattern) { + if (ignoreCondition != null) { State<T> ignoreState = createNormalState(); ignoreState.addTake(lastSink, currentFilterFunction); - ignoreState.addIgnore(trueFunction); - singletonState.addIgnore(ignoreState, trueFunction); + ignoreState.addIgnore(ignoreCondition); + singletonState.addIgnore(ignoreState, ignoreCondition); } return singletonState; } @@ -300,8 +286,10 @@ public class NFACompiler { */ @SuppressWarnings("unchecked") private State<T> createSingletonState(final State<T> sinkState) { - return createSingletonState(sinkState, currentPattern instanceof FollowedByPattern, - currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL)); + return createSingletonState( + sinkState, + getIgnoreCondition(currentPattern), + currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)); } /** @@ -309,12 +297,12 @@ public class NFACompiler { * of a similar state without the PROCEED edge, so that for each PROCEED transition branches * in computation state graph can be created only once. * - * @param addIgnore if any IGNORE should be added + * @param ignoreCondition condition that should be applied to IGNORE transition * @param sinkState state that the state being converted should point to * @return the created state */ @SuppressWarnings("unchecked") - private State<T> createSingletonState(final State<T> sinkState, boolean addIgnore, boolean isOptional) { + private State<T> createSingletonState(final State<T> sinkState, final IterativeCondition<T> ignoreCondition, final boolean isOptional) { final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition(); final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction(); @@ -325,7 +313,7 @@ public class NFACompiler { singletonState.addProceed(sinkState, trueFunction); } - if (addIgnore) { + if (ignoreCondition != null) { final State<T> ignoreState; if (isOptional) { ignoreState = createNormalState(); @@ -333,7 +321,7 @@ public class NFACompiler { } else { ignoreState = singletonState; } - singletonState.addIgnore(ignoreState, getIgnoreCondition(currentPattern)); + singletonState.addIgnore(ignoreState, ignoreCondition); } return singletonState; } @@ -352,8 +340,8 @@ public class NFACompiler { final State<T> firstState = createNormalState(); firstState.addTake(sinkState, currentFilterFunction); - if (currentPattern instanceof FollowedByPattern) { - final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern); + final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern); + if (ignoreCondition != null) { firstState.addIgnore(ignoreCondition); } return firstState; @@ -369,18 +357,16 @@ public class NFACompiler { */ @SuppressWarnings("unchecked") private State<T> createLooping(final State<T> sinkState) { - - final State<T> loopingState = createNormalState(); final IterativeCondition<T> filterFunction = (IterativeCondition<T>) currentPattern.getCondition(); + final IterativeCondition<T> ignoreCondition = getInnerIgnoreCondition(currentPattern); final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction(); + final State<T> loopingState = createNormalState(); loopingState.addProceed(sinkState, trueFunction); loopingState.addTake(filterFunction); - if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.CONSECUTIVE)) { - final State<T> ignoreState = createNormalState(); - - final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern); + if (ignoreCondition != null) { + final State<T> ignoreState = createNormalState(); ignoreState.addTake(loopingState, filterFunction); ignoreState.addIgnore(ignoreCondition); loopingState.addIgnore(ignoreState, ignoreCondition); @@ -403,15 +389,37 @@ public class NFACompiler { /** * @return The {@link IterativeCondition condition} for the {@code IGNORE} edge - * that corresponds to the specified {@link Pattern}. If the pattern is - * {@link QuantifierProperty#EAGER}, the negated user-specified condition is - * returned. In other case, a condition that always evaluated to {@code true} is - * returned. + * that corresponds to the specified {@link Pattern}. It is applicable only for inner states of a complex + * state like looping or times. */ + @SuppressWarnings("unchecked") + private IterativeCondition<T> getInnerIgnoreCondition(Pattern<T, ?> pattern) { + switch (pattern.getQuantifier().getInnerConsumingStrategy()) { + case STRICT: + return null; + case SKIP_TILL_NEXT: + return new NotCondition<>((IterativeCondition<T>) pattern.getCondition()); + case SKIP_TILL_ANY: + return BooleanConditions.trueFunction(); + } + return null; + } + + /** + * @return The {@link IterativeCondition condition} for the {@code IGNORE} edge + * that corresponds to the specified {@link Pattern}. For more on strategy see {@link Quantifier} + */ + @SuppressWarnings("unchecked") private IterativeCondition<T> getIgnoreCondition(Pattern<T, ?> pattern) { - return pattern.getQuantifier().hasProperty(QuantifierProperty.EAGER) - ? new NotCondition<>((IterativeCondition<T>) pattern.getCondition()) - : BooleanConditions.<T>trueFunction(); + switch (pattern.getQuantifier().getConsumingStrategy()) { + case STRICT: + return null; + case SKIP_TILL_NEXT: + return new NotCondition<>((IterativeCondition<T>) pattern.getCondition()); + case SKIP_TILL_ANY: + return BooleanConditions.trueFunction(); + } + return null; } } http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java deleted file mode 100644 index 266451c..0000000 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cep.pattern; - -/** - * Pattern operator which signifies that the there is a non-strict temporal contiguity between - * itself and its preceding pattern operator. This means that there might be events in between - * two matching events. These events are then simply ignored. - * - * @param <T> Base type of the events - * @param <F> Subtype of T to which the operator is currently constrained - */ -public class FollowedByPattern<T, F extends T> extends Pattern<T, F> { - FollowedByPattern(final String name, Pattern<T, ?> previous) { - super(name, previous); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java index 3ce0c73..cef0f85 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java @@ -20,6 +20,7 @@ package org.apache.flink.cep.pattern; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.cep.nfa.NFA; +import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy; import org.apache.flink.cep.pattern.conditions.AndCondition; import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.cep.pattern.conditions.OrCondition; @@ -56,8 +57,8 @@ public class Pattern<T, F extends T> { /** Window length in which the pattern match has to occur. */ private Time windowTime; - /** A quantifier for the pattern. By default set to {@link Quantifier#ONE()}. */ - private Quantifier quantifier = Quantifier.ONE(); + /** A quantifier for the pattern. By default set to {@link Quantifier#ONE(ConsumingStrategy)}. */ + private Quantifier quantifier = Quantifier.ONE(ConsumingStrategy.STRICT); /** * Applicable to a {@code times} pattern, and holds @@ -70,6 +71,15 @@ public class Pattern<T, F extends T> { this.previous = previous; } + protected Pattern( + final String name, + final Pattern<T, ? extends T> previous, + final ConsumingStrategy consumingStrategy) { + this.name = name; + this.previous = previous; + this.quantifier = Quantifier.ONE(consumingStrategy); + } + public Pattern<T, ? extends T> getPrevious() { return previous; } @@ -195,7 +205,19 @@ public class Pattern<T, F extends T> { * @return A new pattern which is appended to this one */ public Pattern<T, T> next(final String name) { - return new Pattern<T, T>(name, this); + return new Pattern<T, T>(name, this, ConsumingStrategy.STRICT); + } + + /** + * Appends a new pattern to the existing one. The new pattern enforces non-strict + * temporal contiguity. This means that a matching event of this pattern and the + * preceding matching event might be interleaved with other events which are ignored. + * + * @param name Name of the new pattern + * @return A new pattern which is appended to this one + */ + public Pattern<T, T> followedBy(final String name) { + return new Pattern<>(name, this, ConsumingStrategy.SKIP_TILL_NEXT); } /** @@ -206,8 +228,8 @@ public class Pattern<T, F extends T> { * @param name Name of the new pattern * @return A new pattern which is appended to this one */ - public FollowedByPattern<T, T> followedBy(final String name) { - return new FollowedByPattern<T, T>(name, this); + public Pattern<T, T> followedByAny(final String name) { + return new Pattern<>(name, this, ConsumingStrategy.SKIP_TILL_ANY); } /** @@ -232,12 +254,12 @@ public class Pattern<T, F extends T> { * {@code A1 A2 B} appears, this will generate patterns: * {@code A1 B} and {@code A1 A2 B}. See also {@link #allowCombinations()}. * - * @return The same pattern with a {@link Quantifier#ONE_OR_MORE()} quantifier applied. + * @return The same pattern with a {@link Quantifier#ONE_OR_MORE(ConsumingStrategy)} quantifier applied. * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ public Pattern<T, F> oneOrMore() { checkIfQuantifierApplied(); - this.quantifier = Quantifier.ONE_OR_MORE(); + this.quantifier = Quantifier.ONE_OR_MORE(quantifier.getConsumingStrategy()); return this; } @@ -252,14 +274,14 @@ public class Pattern<T, F extends T> { public Pattern<T, F> times(int times) { checkIfQuantifierApplied(); Preconditions.checkArgument(times > 0, "You should give a positive number greater than 0."); - this.quantifier = Quantifier.TIMES(); + this.quantifier = Quantifier.TIMES(quantifier.getConsumingStrategy()); this.times = times; return this; } /** - * Applicable only to {@link Quantifier#ONE_OR_MORE()} and {@link Quantifier#TIMES()} patterns, - * this option allows more flexibility to the matching events. + * Applicable only to {@link Quantifier#ONE_OR_MORE(ConsumingStrategy)} and + * {@link Quantifier#TIMES(ConsumingStrategy)} patterns, this option allows more flexibility to the matching events. * * <p>If {@code allowCombinations()} is not applied for a * pattern {@code A.oneOrMore().followedBy(B)} and a sequence of events http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java index 0332ed4..b0f882c 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java @@ -24,48 +24,62 @@ public class Quantifier { private final EnumSet<QuantifierProperty> properties; - private Quantifier(final QuantifierProperty first, final QuantifierProperty... rest) { + private final ConsumingStrategy consumingStrategy; + + private ConsumingStrategy innerConsumingStrategy = ConsumingStrategy.SKIP_TILL_NEXT; + + private Quantifier( + final ConsumingStrategy consumingStrategy, + final QuantifierProperty first, + final QuantifierProperty... rest) { this.properties = EnumSet.of(first, rest); + this.consumingStrategy = consumingStrategy; } - public static Quantifier ONE() { - return new Quantifier(QuantifierProperty.SINGLE); + public static Quantifier ONE(final ConsumingStrategy consumingStrategy) { + return new Quantifier(consumingStrategy, QuantifierProperty.SINGLE); } - public static Quantifier ONE_OR_MORE() { - return new Quantifier(QuantifierProperty.LOOPING, QuantifierProperty.EAGER); + public static Quantifier ONE_OR_MORE(final ConsumingStrategy consumingStrategy) { + return new Quantifier(consumingStrategy, QuantifierProperty.LOOPING); } - public static Quantifier TIMES() { - return new Quantifier(QuantifierProperty.TIMES, QuantifierProperty.EAGER); + public static Quantifier TIMES(final ConsumingStrategy consumingStrategy) { + return new Quantifier(consumingStrategy, QuantifierProperty.TIMES); } public boolean hasProperty(QuantifierProperty property) { return properties.contains(property); } - public void combinations() { - if (!hasProperty(QuantifierProperty.SINGLE) && !hasProperty(Quantifier.QuantifierProperty.EAGER)) { - throw new MalformedPatternException("Combinations already allowed!"); - } + public ConsumingStrategy getConsumingStrategy() { + return consumingStrategy; + } - if (hasProperty(Quantifier.QuantifierProperty.LOOPING) || hasProperty(Quantifier.QuantifierProperty.TIMES)) { - properties.remove(Quantifier.QuantifierProperty.EAGER); - } else { - throw new MalformedPatternException("Combinations not applicable to " + this + "!"); + public ConsumingStrategy getInnerConsumingStrategy() { + return innerConsumingStrategy; + } + + private static void checkPattern(boolean condition, Object errorMessage) { + if (!condition) { + throw new MalformedPatternException(String.valueOf(errorMessage)); } } + public void combinations() { + checkPattern(!hasProperty(QuantifierProperty.SINGLE), "Combinations not applicable to " + this + "!"); + checkPattern(innerConsumingStrategy != ConsumingStrategy.STRICT, "You can apply apply either combinations or consecutive, not both!"); + checkPattern(innerConsumingStrategy != ConsumingStrategy.SKIP_TILL_ANY, "Combinations already applied!"); + + innerConsumingStrategy = ConsumingStrategy.SKIP_TILL_ANY; + } + public void consecutive() { - if (!hasProperty(QuantifierProperty.SINGLE) && hasProperty(Quantifier.QuantifierProperty.CONSECUTIVE)) { - throw new MalformedPatternException("Strict continuity already applied!"); - } + checkPattern(hasProperty(QuantifierProperty.LOOPING) || hasProperty(QuantifierProperty.TIMES), "Combinations not applicable to " + this + "!"); + checkPattern(innerConsumingStrategy != ConsumingStrategy.SKIP_TILL_ANY, "You can apply apply either combinations or consecutive, not both!"); + checkPattern(innerConsumingStrategy != ConsumingStrategy.STRICT, "Combinations already applied!"); - if (hasProperty(Quantifier.QuantifierProperty.LOOPING) || hasProperty(Quantifier.QuantifierProperty.TIMES)) { - properties.add(Quantifier.QuantifierProperty.CONSECUTIVE); - } else { - throw new MalformedPatternException("Strict continuity not applicable to " + this + "!"); - } + innerConsumingStrategy = ConsumingStrategy.STRICT; } public void optional() { @@ -76,13 +90,21 @@ public class Quantifier { } @Override - public boolean equals(Object obj) { - return obj instanceof Quantifier && properties.equals(((Quantifier)obj).properties); + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Quantifier that = (Quantifier) o; + return Objects.equals(properties, that.properties) && + consumingStrategy == that.consumingStrategy; } @Override public int hashCode() { - return Objects.hashCode(properties); + return Objects.hash(properties, consumingStrategy); } /** @@ -92,9 +114,13 @@ public class Quantifier { SINGLE, LOOPING, TIMES, - EAGER, - CONSECUTIVE, OPTIONAL } + public enum ConsumingStrategy { + STRICT, + SKIP_TILL_NEXT, + SKIP_TILL_ANY + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java index 3a32175..f62c686 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java @@ -96,7 +96,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { return value.getName().equals("start"); } }) - .followedBy("middle").subtype(SubEvent.class).where( + .followedByAny("middle").subtype(SubEvent.class).where( new SimpleCondition<SubEvent>() { @Override @@ -105,7 +105,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { } } ) - .followedBy("end").where(new SimpleCondition<Event>() { + .followedByAny("end").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { @@ -171,7 +171,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { return value.getName().equals("start"); } }) - .followedBy("middle").subtype(SubEvent.class).where( + .followedByAny("middle").subtype(SubEvent.class).where( new SimpleCondition<SubEvent>() { @Override @@ -180,7 +180,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { } } ) - .followedBy("end").where(new SimpleCondition<Event>() { + .followedByAny("end").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { @@ -250,13 +250,13 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }).followedBy("middle").where(new SimpleCondition<Event>() { + }).followedByAny("middle").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } - }).followedBy("end").where(new SimpleCondition<Event>() { + }).followedByAny("end").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { @@ -339,13 +339,13 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }).followedBy("middle").where(new SimpleCondition<Event>() { + }).followedByAny("middle").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } - }).followedBy("end").where(new SimpleCondition<Event>() { + }).followedByAny("end").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { @@ -416,7 +416,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { DataStream<Integer> input = env.fromElements(1, 2); - Pattern<Integer, ?> pattern = Pattern.<Integer>begin("start").followedBy("end").within(Time.days(1)); + Pattern<Integer, ?> pattern = Pattern.<Integer>begin("start").followedByAny("end").within(Time.days(1)); DataStream<Integer> result = CEP.pattern(input, pattern).select(new PatternSelectFunction<Integer, Integer>() { @Override @@ -470,13 +470,13 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }).followedBy("middle").where(new SimpleCondition<Event>() { + }).followedByAny("middle").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } - }).followedBy("end").where(new SimpleCondition<Event>() { + }).followedByAny("end").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { @@ -538,7 +538,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { return value.getName().equals("start"); } }) - .followedBy("middle") + .followedByAny("middle") .where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { @@ -551,7 +551,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { return value.getPrice() == 5.0; } }) - .followedBy("end").where(new SimpleCondition<Event>() { + .followedByAny("end").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { @@ -623,13 +623,13 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } - }).followedBy("middle").where(new SimpleCondition<Event>() { + }).followedByAny("middle").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } - }).followedBy("end").where(new SimpleCondition<Event>() { + }).followedByAny("end").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception {