Repository: flink Updated Branches: refs/heads/master 545f50026 -> 0d856b34f
[FLINK-6248] [cep] Make the optional() available to all offered patterns. Allow all patterns to be optional in a pattern sequence. Singleton becomes simply optional, times(X) transforms from "exactly X" to "0 or X", and "one or more" becomes "zero or more". Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b3de48d0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b3de48d0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b3de48d0 Branch: refs/heads/master Commit: b3de48d0e2c1fec6ccab3a8dd0f02b711dc0ae0a Parents: 545f500 Author: kl0u <[email protected]> Authored: Tue Apr 4 17:45:34 2017 +0200 Committer: kl0u <[email protected]> Committed: Wed Apr 26 12:15:51 2017 +0200 ---------------------------------------------------------------------- .../flink/cep/scala/pattern/Pattern.scala | 286 +++++----- .../flink/cep/nfa/compiler/NFACompiler.java | 42 +- .../org/apache/flink/cep/pattern/Pattern.java | 271 ++++------ .../apache/flink/cep/pattern/Quantifier.java | 90 ++-- .../org/apache/flink/cep/nfa/NFAITCase.java | 521 +++++++++++++++---- .../flink/cep/nfa/compiler/NFACompilerTest.java | 2 +- .../apache/flink/cep/pattern/PatternTest.java | 8 +- 7 files changed, 762 insertions(+), 458 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b3de48d0/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala index c636029..3214d09 100644 --- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala @@ -20,7 +20,7 @@ package org.apache.flink.cep.scala.pattern import org.apache.flink.cep import org.apache.flink.cep.pattern.conditions.IterativeCondition import org.apache.flink.cep.pattern.conditions.IterativeCondition.Context -import org.apache.flink.cep.pattern.{Quantifier, Pattern => JPattern} +import org.apache.flink.cep.pattern.{MalformedPatternException, Quantifier, Pattern => JPattern} import org.apache.flink.streaming.api.windowing.time.Time /** @@ -31,9 +31,8 @@ import org.apache.flink.streaming.api.windowing.time.Time * * {{{ * Pattern<T, F> pattern = Pattern.<T>begin("start") - * .next("middle").subtype(F.class) - * .followedBy("end").where(new MyFilterFunction()); - * } + * .next("middle").subtype(F.class) + * .followedBy("end").where(new MyCondition()); * }}} * * @param jPattern Underlying Java API Pattern @@ -45,6 +44,13 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) { private[flink] def wrappedPattern = jPattern /** + * @return The previous pattern + */ + def getPrevious(): Option[Pattern[T, _ <: T]] = { + wrapPattern(jPattern.getPrevious()) + } + + /** * * @return Name of the pattern operator */ @@ -64,170 +70,170 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) { */ def getQuantifier: Quantifier = jPattern.getQuantifier - /** - * - * @return Filter condition for an event to be matched - */ def getCondition(): Option[IterativeCondition[F]] = { Option(jPattern.getCondition()) } /** - * Applies a subtype constraint on the current pattern operator. This means that an event has - * to be of the given subtype in order to be matched. + * Adds a condition that has to be satisfied by an event + * in order to be considered a match. If another condition has already been + * set, the new one is going to be combined with the previous with a + * logical {{{AND}}}. In other case, this is going to be the only + * condition. * - * @param clazz Class of the subtype - * @tparam S Type of the subtype - * @return The same pattern operator with the new subtype constraint + * @param condition The condition as an [[IterativeCondition]]. + * @return The pattern with the new condition is set. */ - def subtype[S <: F](clazz: Class[S]): Pattern[T, S] = { - jPattern.subtype(clazz) - this.asInstanceOf[Pattern[T, S]] - } - - /** - * Defines the maximum time interval for a matching pattern. This means that the time gap - * between first and the last event must not be longer than the window time. - * - * @param windowTime Time of the matching window - * @return The same pattern operator with the new window length - */ - def within(windowTime: Time): Pattern[T, F] = { - jPattern.within(windowTime) + def where(condition: IterativeCondition[F]): Pattern[T, F] = { + jPattern.where(condition) this } /** - * Appends a new pattern operator to the existing one. The new pattern operator enforces strict - * temporal contiguity. This means that the whole pattern only matches if an event which matches - * this operator directly follows the preceding matching event. Thus, there cannot be any - * events in between two matching events. + * Adds a condition that has to be satisfied by an event + * in order to be considered a match. If another condition has already been + * set, the new one is going to be combined with the previous with a + * logical {{{AND}}}. In other case, this is going to be the only + * condition. * - * @param name Name of the new pattern operator - * @return A new pattern operator which is appended to this pattern operator + * @param condition The condition to be set. + * @return The pattern with the new condition is set. */ - def next(name: String): Pattern[T, T] = { - Pattern[T, T](jPattern.next(name)) - } + def where(condition: (F, Context[F]) => Boolean): Pattern[T, F] = { + val condFun = new IterativeCondition[F] { + val cleanCond = cep.scala.cleanClosure(condition) - /** - * Appends a new pattern operator to the existing one. The new pattern operator enforces - * non-strict temporal contiguity. This means that a matching event of this operator and the - * preceding matching event might be interleaved with other events which are ignored. - * - * @param name Name of the new pattern operator - * @return A new pattern operator which is appended to this pattern operator - */ - def followedBy(name: String): FollowedByPattern[T, T] = { - FollowedByPattern(jPattern.followedBy(name)) + override def filter(value: F, ctx: Context[F]): Boolean = cleanCond(value, ctx) + } + where(condFun) } /** - * Specifies a filter condition which has to be fulfilled by an event in order to be matched. + * Adds a condition that has to be satisfied by an event + * in order to be considered a match. If another condition has already been + * set, the new one is going to be combined with the previous with a + * logical {{{AND}}}. In other case, this is going to be the only + * condition. * - * @param filter Filter condition - * @return The same pattern operator where the new filter condition is set + * @param condition The condition to be set. + * @return The pattern with the new condition is set. */ - def where(filter: IterativeCondition[F]): Pattern[T, F] = { - jPattern.where(filter) - this + def where(condition: F => Boolean): Pattern[T, F] = { + val condFun = new IterativeCondition[F] { + val cleanCond = cep.scala.cleanClosure(condition) + + override def filter(value: F, ctx: Context[F]): Boolean = cleanCond(value) + } + where(condFun) } /** - * Specifies a filter condition which is ORed with an existing filter function. + * Adds a condition that has to be satisfied by an event + * in order to be considered a match. If another condition has already been + * set, the new one is going to be combined with the previous with a + * logical {{{OR}}}. In other case, this is going to be the only + * condition. * - * @param filter Or filter function - * @return The same pattern operator where the new filter condition is set + * @param condition The condition as an [[IterativeCondition]]. + * @return The pattern with the new condition is set. */ - def or(filter: IterativeCondition[F]): Pattern[T, F] = { - jPattern.or(filter) + def or(condition: IterativeCondition[F]): Pattern[T, F] = { + jPattern.or(condition) this } /** - * Specifies a filter condition which is ORed with an existing filter function. + * Adds a condition that has to be satisfied by an event + * in order to be considered a match. If another condition has already been + * set, the new one is going to be combined with the previous with a + * logical {{{OR}}}. In other case, this is going to be the only + * condition. * - * @param filterFun Or filter function - * @return The same pattern operator where the new filter condition is set + * @param condition The {{{OR}}} condition. + * @return The pattern with the new condition is set. */ - def or(filterFun: (F, Context[F]) => Boolean): Pattern[T, F] = { - val filter = new IterativeCondition[F] { - val cleanFilter = cep.scala.cleanClosure(filterFun) + def or(condition: (F, Context[F]) => Boolean): Pattern[T, F] = { + val condFun = new IterativeCondition[F] { + val cleanCond = cep.scala.cleanClosure(condition) - override def filter(value: F, ctx: Context[F]): Boolean = cleanFilter(value, ctx) + override def filter(value: F, ctx: Context[F]): Boolean = cleanCond(value, ctx) } - or(filter) + or(condFun) } /** - * Specifies a filter condition which has to be fulfilled by an event in order to be matched. + * Applies a subtype constraint on the current pattern. This means that an event has + * to be of the given subtype in order to be matched. * - * @param filterFun Filter condition - * @return The same pattern operator where the new filter condition is set + * @param clazz Class of the subtype + * @tparam S Type of the subtype + * @return The same pattern with the new subtype constraint */ - def where(filterFun: (F, Context[F]) => Boolean): Pattern[T, F] = { - val filter = new IterativeCondition[F] { - val cleanFilter = cep.scala.cleanClosure(filterFun) - - override def filter(value: F, ctx: Context[F]): Boolean = cleanFilter(value, ctx) - } - where(filter) + def subtype[S <: F](clazz: Class[S]): Pattern[T, S] = { + jPattern.subtype(clazz) + this.asInstanceOf[Pattern[T, S]] } /** - * Specifies a filter condition which has to be fulfilled by an event in order to be matched. + * Defines the maximum time interval in which a matching pattern has to be completed in + * order to be considered valid. This interval corresponds to the maximum time gap between first + * and the last event. * - * @param filterFun Filter condition - * @return The same pattern operator where the new filter condition is set + * @param windowTime Time of the matching window + * @return The same pattern operator with the new window length */ - def where(filterFun: F => Boolean): Pattern[T, F] = { - val filter = new IterativeCondition[F] { - val cleanFilter = cep.scala.cleanClosure(filterFun) - - override def filter(value: F, ctx: Context[F]): Boolean = cleanFilter(value) - } - where(filter) + def within(windowTime: Time): Pattern[T, F] = { + jPattern.within(windowTime) + this } /** + * Appends a new pattern to the existing one. The new pattern enforces strict + * temporal contiguity. This means that the whole pattern sequence matches only + * if an event which matches this pattern directly follows the preceding matching + * event. Thus, there cannot be any events in between two matching events. * - * @return The previous pattern operator + * @param name Name of the new pattern + * @return A new pattern which is appended to this one */ - def getPrevious(): Option[Pattern[T, _ <: T]] = { - wrapPattern(jPattern.getPrevious()) + def next(name: String): Pattern[T, T] = { + Pattern[T, T](jPattern.next(name)) } /** - * Specifies that this pattern can occur zero or more times(kleene star). - * This means any number of events can be matched in this state. + * Appends a new pattern to the existing one. The new pattern enforces non-strict + * temporal contiguity. This means that a matching event of this pattern and the + * preceding matching event might be interleaved with other events which are ignored. * - * @return The same pattern with applied Kleene star operator + * @param name Name of the new pattern + * @return A new pattern which is appended to this one */ - def zeroOrMore: Pattern[T, F] = { - jPattern.zeroOrMore() - this - } + def followedBy(name: String) = FollowedByPattern(jPattern.followedBy(name)) /** - * Specifies that this pattern can occur zero or more times(kleene star). - * This means any number of events can be matched in this state. - * - * If eagerness is enabled for a pattern A*B and sequence A1 A2 B will generate patterns: - * B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B. + * Specifies that this pattern is optional for a final match of the pattern + * sequence to happen. * - * @param eager if true the pattern always consumes earlier events - * @return The same pattern with applied Kleene star operator + * @return The same pattern as optional. + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ - def zeroOrMore(eager: Boolean): Pattern[T, F] = { - jPattern.zeroOrMore(eager) + def optional: Pattern[T, F] = { + jPattern.optional() this } /** - * Specifies that this pattern can occur one or more times(kleene star). - * This means at least one and at most infinite number of events can be matched in this state. + * Specifies that this pattern can occur {{{one or more}}} times. + * This means at least one and at most infinite number of events can + * be matched to this pattern. + * + * If this quantifier is enabled for a + * pattern {{{A.oneOrMore().followedBy(B)}}} and a sequence of events + * {{{A1 A2 B}}} appears, this will generate patterns: + * {{{A1 B}}} and {{{A1 A2 B}}}. See also {{{allowCombinations()}}}. * - * @return The same pattern with applied Kleene plus operator + * @return The same pattern with a [[Quantifier.ONE_OR_MORE()]] quantifier applied. + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ def oneOrMore: Pattern[T, F] = { jPattern.oneOrMore() @@ -235,63 +241,51 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) { } /** - * Specifies that this pattern can occur one or more times(kleene star). - * This means at least one and at most infinite number of events can be matched in this state. - * - * If eagerness is enabled for a pattern A+B and sequence A1 A2 B will generate patterns: - * A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B. + * Specifies exact number of times that this pattern should be matched. * - * @param eager if true the pattern always consumes earlier events - * @return The same pattern with applied Kleene plus operator + * @param times number of times matching event must appear + * @return The same pattern with number of times applied + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ - def oneOrMore(eager: Boolean): Pattern[T, F] = { - jPattern.oneOrMore(eager) + def times(times: Int): Pattern[T, F] = { + jPattern.times(times) this } /** - * Specifies that this pattern can occur zero or once. + * Applicable only to [[Quantifier.ONE_OR_MORE()]] patterns, this option + * allows more flexibility to the matching events. * - * @return The same pattern with applied Kleene ? operator - */ - def optional: Pattern[T, F] = { - jPattern.optional() - this - } - - /** - * Specifies exact number of times that this pattern should be matched. + * If {{{allowCombinations()}}} is not applied for a + * pattern {{{A.oneOrMore().followedBy(B)}}} and a sequence of events + * {{{A1 A2 B}}} appears, this will generate patterns: + * {{{A1 B}}} and {{{A1 A2 B}}}. If this method is applied, we + * will have {{{A1 B}}}, {{{A2 B}}} and {{{A1 A2 B}}}. * - * @param times number of times matching event must appear - * @return The same pattern with number of times applied + * @return The same pattern with the updated quantifier. + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ - def times(times: Int): Pattern[T, F] = { - jPattern.times(times) + def allowCombinations(): Pattern[T, F] = { + jPattern.allowCombinations() this } - /** - * Works in conjunction with [[org.apache.flink.cep.scala.pattern.Pattern#zeroOrMore()]], - * [[org.apache.flink.cep.scala.pattern.Pattern#oneOrMore()]] or - * [[org.apache.flink.cep.scala.pattern.Pattern#times(int)]]. + * Works in conjunction with [[Pattern#oneOrMore()]] or [[Pattern#times(int)]]. * Specifies that any not matching element breaks the loop. * - * <p>E.g. a pattern like: + * E.g. a pattern like: * {{{ * Pattern.begin("start").where(_.getName().equals("c")) - * .followedBy("middle").where(_.getName().equals("a")).oneOrMore(true).consecutive() + * .followedBy("middle").where(_.getName().equals("a")).oneOrMore().consecutive() * .followedBy("end1").where(_.getName().equals("b")); * }}} * - * <p>for a sequence: C D A1 A2 A3 D A4 B - * - * <p>will generate matches: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B} + * For a sequence: C D A1 A2 A3 D A4 B * - * <p><b>NOTICE:</b> This operator can be applied only when either zeroOrMore, - * oneOrMore or times was previously applied! + * will generate matches: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B} * - * <p>By default a relaxed continuity is applied. + * By default a relaxed continuity is applied. * @return pattern with continuity changed to strict */ def consecutive(): Pattern[T, F] = { @@ -314,12 +308,12 @@ object Pattern { def apply[T, F <: T](jPattern: JPattern[T, F]) = new Pattern[T, F](jPattern) /** - * Starts a new pattern with the initial pattern operator whose name is provided. Furthermore, - * the base type of the event sequence is set. + * Starts a new pattern sequence. The provided name is the one of the initial pattern + * of the new sequence. Furthermore, the base type of the event sequence is set. * - * @param name Name of the new pattern operator + * @param name The name of starting pattern of the new pattern sequence * @tparam X Base type of the event pattern - * @return The first pattern operator of a pattern + * @return The first pattern of a pattern sequence */ def begin[X](name: String): Pattern[X, X] = Pattern(JPattern.begin(name)) http://git-wip-us.apache.org/repos/asf/flink/blob/b3de48d0/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java index 5ab44c0..add1583 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java @@ -31,7 +31,6 @@ import org.apache.flink.cep.pattern.FollowedByPattern; import org.apache.flink.cep.pattern.MalformedPatternException; import org.apache.flink.cep.pattern.conditions.NotCondition; import org.apache.flink.cep.pattern.Pattern; -import org.apache.flink.cep.pattern.Quantifier; import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty; import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.streaming.api.windowing.time.Time; @@ -166,10 +165,10 @@ public class NFACompiler { if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) { final State<T> looping = createLooping(lastSink); - if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) { + if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL)) { lastSink = createFirstMandatoryStateOfLoop(looping); } else if (currentPattern instanceof FollowedByPattern && - currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT)) { + currentPattern.getQuantifier().hasProperty(QuantifierProperty.CONSECUTIVE)) { lastSink = createWaitingStateForZeroOrMore(looping, lastSink); } else { lastSink = looping; @@ -237,7 +236,7 @@ public class NFACompiler { final State<T> beginningState; if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) { final State<T> loopingState = createLooping(sinkState); - if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) { + if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL)) { beginningState = createFirstMandatoryStateOfLoop(loopingState); } else { beginningState = loopingState; @@ -266,9 +265,29 @@ public class NFACompiler { for (int i = 0; i < times - 1; i++) { lastSink = createSingletonState( lastSink, - !currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT)); + !currentPattern.getQuantifier().hasProperty(QuantifierProperty.CONSECUTIVE), + false); } - return createSingletonState(lastSink, currentPattern instanceof FollowedByPattern); + + // we created the intermediate states in the loop, now we create the start of the loop. + if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL)) { + return createSingletonState(lastSink, currentPattern instanceof FollowedByPattern, false); + } + + final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition(); + final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction(); + + final State<T> singletonState = createNormalState(); + singletonState.addTake(lastSink, currentFilterFunction); + singletonState.addProceed(sinkState, trueFunction); + + if (currentPattern instanceof FollowedByPattern) { + State<T> ignoreState = createNormalState(); + ignoreState.addTake(lastSink, currentFilterFunction); + ignoreState.addIgnore(BooleanConditions.<T>trueFunction()); + singletonState.addIgnore(ignoreState, trueFunction); + } + return singletonState; } /** @@ -281,7 +300,8 @@ public class NFACompiler { */ @SuppressWarnings("unchecked") private State<T> createSingletonState(final State<T> sinkState) { - return createSingletonState(sinkState, currentPattern instanceof FollowedByPattern); + return createSingletonState(sinkState, currentPattern instanceof FollowedByPattern, + currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL)); } /** @@ -294,20 +314,20 @@ public class NFACompiler { * @return the created state */ @SuppressWarnings("unchecked") - private State<T> createSingletonState(final State<T> sinkState, boolean addIgnore) { + private State<T> createSingletonState(final State<T> sinkState, boolean addIgnore, boolean isOptional) { final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition(); final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction(); final State<T> singletonState = createNormalState(); singletonState.addTake(sinkState, currentFilterFunction); - if (currentPattern.getQuantifier() == Quantifier.OPTIONAL) { + if (isOptional) { singletonState.addProceed(sinkState, trueFunction); } if (addIgnore) { final State<T> ignoreState; - if (currentPattern.getQuantifier() == Quantifier.OPTIONAL) { + if (isOptional) { ignoreState = createNormalState(); ignoreState.addTake(sinkState, currentFilterFunction); } else { @@ -356,7 +376,7 @@ public class NFACompiler { loopingState.addProceed(sinkState, trueFunction); loopingState.addTake(filterFunction); - if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT)) { + if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.CONSECUTIVE)) { final State<T> ignoreState = createNormalState(); final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern); http://git-wip-us.apache.org/repos/asf/flink/blob/b3de48d0/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java index 92b655e..f1d0d63 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java @@ -30,13 +30,12 @@ import org.apache.flink.util.Preconditions; /** * Base class for a pattern definition. * <p> - * A pattern definition is used by {@link org.apache.flink.cep.nfa.compiler.NFACompiler} to create - * a {@link NFA}. + * A pattern definition is used by {@link org.apache.flink.cep.nfa.compiler.NFACompiler} to create a {@link NFA}. * * <pre>{@code * Pattern<T, F> pattern = Pattern.<T>begin("start") * .next("middle").subtype(F.class) - * .followedBy("end").where(new MyFilterFunction()); + * .followedBy("end").where(new MyCondition()); * } * </pre> * @@ -45,20 +44,25 @@ import org.apache.flink.util.Preconditions; */ public class Pattern<T, F extends T> { - // name of the pattern operator + /** Name of the pattern. */ private final String name; - // previous pattern operator + /** Previous pattern. */ private final Pattern<T, ? extends T> previous; - // filter condition for an event to be matched + /** The condition an event has to satisfy to be considered a matched. */ private IterativeCondition<F> condition; - // window length in which the pattern match has to occur + /** Window length in which the pattern match has to occur. */ private Time windowTime; - private Quantifier quantifier = Quantifier.ONE; + /** A quantifier for the pattern. By default set to {@link Quantifier#ONE()}. */ + private Quantifier quantifier = Quantifier.ONE(); + /** + * Applicable to a {@code times} pattern, and holds + * the number of times it has to appear. + */ private int times; protected Pattern(final String name, final Pattern<T, ? extends T> previous) { @@ -66,16 +70,16 @@ public class Pattern<T, F extends T> { this.previous = previous; } - public String getName() { - return name; - } - public Pattern<T, ? extends T> getPrevious() { return previous; } - public IterativeCondition<F> getCondition() { - return condition; + public int getTimes() { + return times; + } + + public String getName() { + return name; } public Time getWindowTime() { @@ -86,15 +90,31 @@ public class Pattern<T, F extends T> { return quantifier; } - public int getTimes() { - return times; + public IterativeCondition<F> getCondition() { + return condition; } /** - * Specifies a filter condition which has to be fulfilled by an event in order to be matched. + * Starts a new pattern sequence. The provided name is the one of the initial pattern + * of the new sequence. Furthermore, the base type of the event sequence is set. * - * @param condition Filter condition - * @return The same pattern operator where the new filter condition is set + * @param name The name of starting pattern of the new pattern sequence + * @param <X> Base type of the event pattern + * @return The first pattern of a pattern sequence + */ + public static <X> Pattern<X, X> begin(final String name) { + return new Pattern<X, X>(name, null); + } + + /** + * Adds a condition that has to be satisfied by an event + * in order to be considered a match. If another condition has already been + * set, the new one is going to be combined with the previous with a + * logical {@code AND}. In other case, this is going to be the only + * condition. + * + * @param condition The condition as an {@link IterativeCondition}. + * @return The pattern with the new condition is set. */ public Pattern<T, F> where(IterativeCondition<F> condition) { ClosureCleaner.clean(condition, true); @@ -108,10 +128,14 @@ public class Pattern<T, F extends T> { } /** - * Specifies a filter condition which is OR'ed with an existing filter function. + * Adds a condition that has to be satisfied by an event + * in order to be considered a match. If another condition has already been + * set, the new one is going to be combined with the previous with a + * logical {@code OR}. In other case, this is going to be the only + * condition. * - * @param condition OR filter condition - * @return The same pattern operator where the new filter condition is set + * @param condition The condition as an {@link IterativeCondition}. + * @return The pattern with the new condition is set. */ public Pattern<T, F> or(IterativeCondition<F> condition) { ClosureCleaner.clean(condition, true); @@ -125,19 +149,18 @@ public class Pattern<T, F extends T> { } /** - * Applies a subtype constraint on the current pattern operator. This means that an event has + * Applies a subtype constraint on the current pattern. This means that an event has * to be of the given subtype in order to be matched. * * @param subtypeClass Class of the subtype * @param <S> Type of the subtype - * @return The same pattern operator with the new subtype constraint + * @return The same pattern with the new subtype constraint */ public <S extends F> Pattern<T, S> subtype(final Class<S> subtypeClass) { if (condition == null) { this.condition = new SubtypeCondition<F>(subtypeClass); } else { - this.condition = new AndCondition<>(this.condition, - new SubtypeCondition<F>(subtypeClass)); + this.condition = new AndCondition<>(condition, new SubtypeCondition<F>(subtypeClass)); } @SuppressWarnings("unchecked") @@ -147,8 +170,9 @@ public class Pattern<T, F extends T> { } /** - * Defines the maximum time interval for a matching pattern. This means that the time gap - * between first and the last event must not be longer than the window time. + * Defines the maximum time interval in which a matching pattern has to be completed in + * order to be considered valid. This interval corresponds to the maximum time gap between first + * and the last event. * * @param windowTime Time of the matching window * @return The same pattern operator with the new window length @@ -162,130 +186,114 @@ public class Pattern<T, F extends T> { } /** - * Appends a new pattern operator to the existing one. The new pattern operator enforces strict - * temporal contiguity. This means that the whole pattern only matches if an event which matches - * this operator directly follows the preceding matching event. Thus, there cannot be any - * events in between two matching events. + * Appends a new pattern to the existing one. The new pattern enforces strict + * temporal contiguity. This means that the whole pattern sequence matches only + * if an event which matches this pattern directly follows the preceding matching + * event. Thus, there cannot be any events in between two matching events. * - * @param name Name of the new pattern operator - * @return A new pattern operator which is appended to this pattern operator + * @param name Name of the new pattern + * @return A new pattern which is appended to this one */ public Pattern<T, T> next(final String name) { return new Pattern<T, T>(name, this); } /** - * Appends a new pattern operator to the existing one. The new pattern operator enforces - * non-strict temporal contiguity. This means that a matching event of this operator and the + * Appends a new pattern to the existing one. The new pattern enforces non-strict + * temporal contiguity. This means that a matching event of this pattern and the * preceding matching event might be interleaved with other events which are ignored. * - * @param name Name of the new pattern operator - * @return A new pattern operator which is appended to this pattern operator + * @param name Name of the new pattern + * @return A new pattern which is appended to this one */ public FollowedByPattern<T, T> followedBy(final String name) { return new FollowedByPattern<T, T>(name, this); } /** - * Starts a new pattern with the initial pattern operator whose name is provided. Furthermore, - * the base type of the event sequence is set. - * - * @param name Name of the new pattern operator - * @param <X> Base type of the event pattern - * @return The first pattern operator of a pattern - */ - public static <X> Pattern<X, X> begin(final String name) { - return new Pattern<X, X>(name, null); - } - - /** - * Specifies that this pattern can occur zero or more times(kleene star). - * This means any number of events can be matched in this state. + * Specifies that this pattern is optional for a final match of the pattern + * sequence to happen. * - * @return The same pattern with applied Kleene star operator - * - * @throws MalformedPatternException if quantifier already applied + * @return The same pattern as optional. + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ - public Pattern<T, F> zeroOrMore() { - return zeroOrMore(true); + public Pattern<T, F> optional() { + quantifier.makeOptional(); + return this; } /** - * Specifies that this pattern can occur zero or more times(kleene star). - * This means any number of events can be matched in this state. - * - * If eagerness is enabled for a pattern A*B and sequence A1 A2 B will generate patterns: - * B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B. + * Specifies that this pattern can occur {@code one or more} times. + * This means at least one and at most infinite number of events can + * be matched to this pattern. * - * @param eager if true the pattern always consumes earlier events - * @return The same pattern with applied Kleene star operator + * <p>If this quantifier is enabled for a + * pattern {@code A.oneOrMore().followedBy(B)} and a sequence of events + * {@code A1 A2 B} appears, this will generate patterns: + * {@code A1 B} and {@code A1 A2 B}. See also {@link #allowCombinations()}. * - * @throws MalformedPatternException if quantifier already applied + * @return The same pattern with a {@link Quantifier#ONE_OR_MORE()} quantifier applied. + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ - public Pattern<T, F> zeroOrMore(final boolean eager) { + public Pattern<T, F> oneOrMore() { checkIfQuantifierApplied(); - if (eager) { - this.quantifier = Quantifier.ZERO_OR_MORE_EAGER; - } else { - this.quantifier = Quantifier.ZERO_OR_MORE_COMBINATIONS; - } + this.quantifier = Quantifier.ONE_OR_MORE(); return this; } /** - * Specifies that this pattern can occur one or more times(kleene star). - * This means at least one and at most infinite number of events can be matched in this state. + * Specifies exact number of times that this pattern should be matched. * - * @return The same pattern with applied Kleene plus operator + * @param times number of times matching event must appear + * @return The same pattern with number of times applied * - * @throws MalformedPatternException if quantifier already applied + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ - public Pattern<T, F> oneOrMore() { - return oneOrMore(true); + public Pattern<T, F> times(int times) { + checkIfQuantifierApplied(); + Preconditions.checkArgument(times > 0, "You should give a positive number greater than 0."); + this.quantifier = Quantifier.TIMES(); + this.times = times; + return this; } /** - * Specifies that this pattern can occur one or more times(kleene star). - * This means at least one and at most infinite number of events can be matched in this state. + * Applicable only to {@link Quantifier#ONE_OR_MORE()} patterns, this option + * allows more flexibility to the matching events. * - * If eagerness is enabled for a pattern A+B and sequence A1 A2 B will generate patterns: - * A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B. + * <p>If {@code allowCombinations()} is not applied for a + * pattern {@code A.oneOrMore().followedBy(B)} and a sequence of events + * {@code A1 A2 B} appears, this will generate patterns: + * {@code A1 B} and {@code A1 A2 B}. If this method is applied, we + * will have {@code A1 B}, {@code A2 B} and {@code A1 A2 B}. * - * @param eager if true the pattern always consumes earlier events - * @return The same pattern with applied Kleene plus operator - * - * @throws MalformedPatternException if quantifier already applied + * @return The same pattern with the updated quantifier. * + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ - public Pattern<T, F> oneOrMore(final boolean eager) { - checkIfQuantifierApplied(); - if (eager) { - this.quantifier = Quantifier.ONE_OR_MORE_EAGER; - } else { - this.quantifier = Quantifier.ONE_OR_MORE_COMBINATIONS; - } + public Pattern<T, F> allowCombinations() { + quantifier.allowAllCombinations(); return this; } /** - * Works in conjunction with {@link Pattern#zeroOrMore()}, {@link Pattern#oneOrMore()} or {@link Pattern#times(int)}. + * Works in conjunction with {@link Pattern#oneOrMore()} or {@link Pattern#times(int)}. * Specifies that any not matching element breaks the loop. * * <p>E.g. a pattern like: * <pre>{@code - * Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + * Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { * @Override * public boolean filter(Event value) throws Exception { * return value.getName().equals("c"); * } * }) - * .followedBy("middle").where(new FilterFunction<Event>() { + * .followedBy("middle").where(new SimpleCondition<Event>() { * @Override * public boolean filter(Event value) throws Exception { * return value.getName().equals("a"); * } - * }) - * }<b>.oneOrMore(true).consecutive()</b>{@code - * .followedBy("end1").where(new FilterFunction<Event>() { + * }).oneOrMore().consecutive() + * .followedBy("end1").where(new SimpleCondition<Event>() { * @Override * public boolean filter(Event value) throws Exception { * return value.getName().equals("b"); @@ -297,78 +305,19 @@ public class Pattern<T, F extends T> { * * <p>will generate matches: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B} * - * <p><b>NOTICE:</b> This operator can be applied only when either zeroOrMore, - * oneOrMore or times was previously applied! - * * <p>By default a relaxed continuity is applied. * * @return pattern with continuity changed to strict */ public Pattern<T, F> consecutive() { - switch (this.quantifier) { - - case ZERO_OR_MORE_EAGER: - this.quantifier = Quantifier.ZERO_OR_MORE_EAGER_STRICT; - break; - case ZERO_OR_MORE_COMBINATIONS: - this.quantifier = Quantifier.ZERO_OR_MORE_COMBINATIONS_STRICT; - break; - case ONE_OR_MORE_EAGER: - this.quantifier = Quantifier.ONE_OR_MORE_EAGER_STRICT; - break; - case ONE_OR_MORE_COMBINATIONS: - this.quantifier = Quantifier.ONE_OR_MORE_COMBINATIONS_STRICT; - break; - case TIMES: - this.quantifier = Quantifier.TIMES_STRICT; - break; - case ZERO_OR_MORE_COMBINATIONS_STRICT: - case ONE_OR_MORE_EAGER_STRICT: - case ONE_OR_MORE_COMBINATIONS_STRICT: - case ZERO_OR_MORE_EAGER_STRICT: - case TIMES_STRICT: - throw new MalformedPatternException("Strict continuity already applied! consecutive() called twice."); - case ONE: - case OPTIONAL: - throw new MalformedPatternException("Strict continuity cannot be applied to " + this.quantifier); - } - - return this; - } - - /** - * Specifies that this pattern can occur zero or once. - * - * @return The same pattern with applied Kleene ? operator - * - * @throws MalformedPatternException if quantifier already applied - */ - public Pattern<T, F> optional() { - checkIfQuantifierApplied(); - this.quantifier = Quantifier.OPTIONAL; - return this; - } - - /** - * Specifies exact number of times that this pattern should be matched. - * - * @param times number of times matching event must appear - * @return The same pattern with number of times applied - * - * @throws MalformedPatternException if quantifier already applied - */ - public Pattern<T, F> times(int times) { - checkIfQuantifierApplied(); - Preconditions.checkArgument(times > 0, "You should give a positive number greater than 0."); - this.quantifier = Quantifier.TIMES; - this.times = times; + quantifier.makeConsecutive(); return this; } private void checkIfQuantifierApplied() { - if (this.quantifier != Quantifier.ONE) { - throw new MalformedPatternException("Already applied quantifier to this Pattern. Current quantifier is: " + this.quantifier); + if (!quantifier.hasProperty(Quantifier.QuantifierProperty.SINGLE)) { + throw new MalformedPatternException("Already applied quantifier to this Pattern. " + + "Current quantifier is: " + quantifier); } } - } http://git-wip-us.apache.org/repos/asf/flink/blob/b3de48d0/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java index 9789072..9deacd4 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java @@ -18,51 +18,83 @@ package org.apache.flink.cep.pattern; import java.util.EnumSet; +import java.util.Objects; -public enum Quantifier { - ONE, - ZERO_OR_MORE_EAGER(QuantifierProperty.LOOPING, QuantifierProperty.EAGER), - ZERO_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING), - ZERO_OR_MORE_EAGER_STRICT(QuantifierProperty.EAGER, QuantifierProperty.STRICT, QuantifierProperty.LOOPING), - ZERO_OR_MORE_COMBINATIONS_STRICT(QuantifierProperty.STRICT, QuantifierProperty.LOOPING), - ONE_OR_MORE_EAGER( - QuantifierProperty.LOOPING, - QuantifierProperty.EAGER, - QuantifierProperty.AT_LEAST_ONE), - ONE_OR_MORE_EAGER_STRICT( - QuantifierProperty.STRICT, - QuantifierProperty.LOOPING, - QuantifierProperty.EAGER, - QuantifierProperty.AT_LEAST_ONE), - ONE_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING, QuantifierProperty.AT_LEAST_ONE), - ONE_OR_MORE_COMBINATIONS_STRICT( - QuantifierProperty.STRICT, - QuantifierProperty.LOOPING, - QuantifierProperty.AT_LEAST_ONE), - TIMES(QuantifierProperty.TIMES), - TIMES_STRICT(QuantifierProperty.TIMES, QuantifierProperty.STRICT), - OPTIONAL; +public class Quantifier { private final EnumSet<QuantifierProperty> properties; - Quantifier(final QuantifierProperty first, final QuantifierProperty... rest) { + private Quantifier(final QuantifierProperty first, final QuantifierProperty... rest) { this.properties = EnumSet.of(first, rest); } - Quantifier() { - this.properties = EnumSet.noneOf(QuantifierProperty.class); + public static Quantifier ONE() { + return new Quantifier(QuantifierProperty.SINGLE); + } + + public static Quantifier ONE_OR_MORE() { + return new Quantifier(QuantifierProperty.LOOPING, QuantifierProperty.EAGER); + } + + public static Quantifier TIMES() { + return new Quantifier(QuantifierProperty.TIMES); } public boolean hasProperty(QuantifierProperty property) { return properties.contains(property); } + public void allowAllCombinations() { + if (!hasProperty(Quantifier.QuantifierProperty.EAGER)) { + throw new MalformedPatternException("Combinations already allowed!"); + } + + if (hasProperty(Quantifier.QuantifierProperty.LOOPING)) { + properties.remove(Quantifier.QuantifierProperty.EAGER); + } else { + throw new MalformedPatternException("Combinations not applicable to " + this + "!"); + } + } + + public void makeConsecutive() { + if (hasProperty(Quantifier.QuantifierProperty.CONSECUTIVE)) { + throw new MalformedPatternException("Strict continuity already applied!"); + } + + if (hasProperty(Quantifier.QuantifierProperty.LOOPING) || hasProperty(Quantifier.QuantifierProperty.TIMES)) { + properties.add(Quantifier.QuantifierProperty.CONSECUTIVE); + } else { + throw new MalformedPatternException("Strict continuity not applicable to " + this + "!"); + } + } + + public void makeOptional() { + if (hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) { + throw new MalformedPatternException("Optional already applied!"); + } + properties.add(Quantifier.QuantifierProperty.OPTIONAL); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof Quantifier && properties.equals(((Quantifier)obj).properties); + } + + @Override + public int hashCode() { + return Objects.hashCode(properties); + } + + /** + * Properties that a {@link Quantifier} can have. Not all combinations are valid. + */ public enum QuantifierProperty { + SINGLE, LOOPING, + TIMES, EAGER, - AT_LEAST_ONE, - STRICT, - TIMES + CONSECUTIVE, + OPTIONAL } } http://git-wip-us.apache.org/repos/asf/flink/blob/b3de48d0/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java index 7359ca8..cffb4e7 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java @@ -456,7 +456,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore(false).followedBy("end1").where(new SimpleCondition<Event>() { + }).oneOrMore().allowCombinations().optional().followedBy("end1").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -544,7 +544,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore(false).followedBy("end1").where(new SimpleCondition<Event>() { + }).oneOrMore().allowCombinations().optional().followedBy("end1").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -609,7 +609,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore(true).followedBy("end1").where(new SimpleCondition<Event>() { + }).oneOrMore().optional().followedBy("end1").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -664,7 +664,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore().followedBy("end").where(new SimpleCondition<Event>() { + }).oneOrMore().optional().followedBy("end").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -731,14 +731,14 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore(false).followedBy("middle-second").where(new SimpleCondition<Event>() { + }).oneOrMore().allowCombinations().optional().followedBy("middle-second").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("d"); } - }).zeroOrMore(false).followedBy("end").where(new SimpleCondition<Event>() { + }).oneOrMore().allowCombinations().optional().followedBy("end").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -824,7 +824,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("d"); } - }).zeroOrMore(false).followedBy("end").where(new SimpleCondition<Event>() { + }).oneOrMore().allowCombinations().optional().followedBy("end").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -892,7 +892,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore() + }).oneOrMore().optional() .next("end").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @@ -947,7 +947,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore(false).next("end").where(new SimpleCondition<Event>() { + }).oneOrMore().optional().allowCombinations().next("end").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1007,7 +1007,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).oneOrMore(false).followedBy("end1").where(new SimpleCondition<Event>() { + }).oneOrMore().allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1061,7 +1061,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).oneOrMore(false).followedBy("end").where(new SimpleCondition<Event>() { + }).oneOrMore().allowCombinations().followedBy("end").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1129,7 +1129,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } - }).zeroOrMore().consecutive().followedBy("end").where(new SimpleCondition<Event>() { + }).oneOrMore().optional().consecutive().followedBy("end").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 7056763917392056548L; @Override @@ -1190,7 +1190,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).oneOrMore(true).followedBy("end1").where(new SimpleCondition<Event>() { + }).oneOrMore().followedBy("end1").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -1602,7 +1602,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore(); + }).oneOrMore().optional(); NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); @@ -1656,7 +1656,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore(); + }).oneOrMore().optional(); NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); @@ -1788,6 +1788,228 @@ public class NFAITCase extends TestLogger { ), resultingPatterns); } + /////////////////////////////// Optional //////////////////////////////////////// + + @Test + public void testTimesNonStrictOptional1() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(3).optional().followedBy("end1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + @Test + public void testTimesNonStrictOptional2() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2).optional().followedBy("end1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + @Test + public void testTimesStrictOptional() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2).consecutive().optional().followedBy("end1").where(new SimpleCondition<Event>() { // TODO: 4/4/17 also check order consecutive() vs optional() + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + @Test + public void testTimesStrictOptional1() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).next("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2).consecutive().optional().followedBy("end1").where(new SimpleCondition<Event>() { // TODO: 4/4/17 also check order consecutive() vs optional() + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + @Test + public void testOptionalTimesNonStrictWithNext() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).next("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2).optional().followedBy("end1").where(new SimpleCondition<Event>() { // TODO: 4/4/17 also check order consecutive() vs optional() + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } /////////////////////////////// Consecutive //////////////////////////////////////// @@ -1841,29 +2063,56 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent4, 7)); inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 8)); - Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + Pattern<Event, ?> pattern; + if (eager) { + pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("c"); - } - }).followedBy("middle").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).oneOrMore(eager).consecutive() - .followedBy("end1").where(new SimpleCondition<Event>() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().consecutive() + .followedBy("end1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + } else { + pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); + return value.getName().equals("a"); } - }); + }).oneOrMore().allowCombinations().consecutive() + .followedBy("end1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + } NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); @@ -1905,28 +2154,51 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); - Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + Pattern<Event, ?> pattern = eager + ? Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("c"); - } - }).followedBy("middle").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("a"); - } - }).zeroOrMore(eager).consecutive().followedBy("end1").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().consecutive().followedBy("end1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("b"); - } - }); + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }) + : Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().allowCombinations().optional().consecutive().followedBy("end1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); @@ -2033,7 +2305,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore().consecutive(); + }).oneOrMore().optional().consecutive(); testStartWithOneOrZeroOrMoreStrict(pattern); } @@ -2183,7 +2455,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).oneOrMore(false).followedBy("end1").where(new SimpleCondition<Event>() { + }).oneOrMore().allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -2227,7 +2499,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } - }).zeroOrMore(false).followedBy("end1").where(new SimpleCondition<Event>() { + }).oneOrMore().allowCombinations().optional().followedBy("end1").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override @@ -2313,37 +2585,41 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(nextOne, 6)); inputEvents.add(new StreamRecord<>(endEvent, 8)); - Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("start"); - } - }).followedBy("middle").subtype(SubEvent.class).where(new IterativeCondition<SubEvent>() { - private static final long serialVersionUID = 6215754202506583964L; - - @Override - public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception { - if (!value.getName().startsWith("foo")) { - return false; - } + Pattern<Event, ?> pattern = eager + ? Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; - double sum = 0.0; - for (Event event : ctx.getEventsForPattern("middle")) { - sum += event.getPrice(); - } - sum += value.getPrice(); - return Double.compare(sum, 5.0) < 0; - } - }).oneOrMore(eager).followedBy("end").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 7056763917392056548L; + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }) + .followedBy("middle").subtype(SubEvent.class).where(new MySubeventIterCondition()).oneOrMore() + .followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }) + : Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("end"); - } - }); + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }) + .followedBy("middle").subtype(SubEvent.class).where(new MySubeventIterCondition()).oneOrMore().allowCombinations() + .followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); @@ -2352,6 +2628,25 @@ public class NFAITCase extends TestLogger { return resultingPatterns; } + private static class MySubeventIterCondition extends IterativeCondition<SubEvent> { + + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter (SubEvent value, Context < SubEvent > ctx) throws Exception { + if (!value.getName().startsWith("foo")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("middle")) { + sum += event.getPrice(); + } + sum += value.getPrice(); + return Double.compare(sum, 5.0) < 0; + } + } + @Test public void testIterativeWithLoopingStartingEager() { List<List<Event>> actual = testIterativeWithLoopingStarting(true); @@ -2395,30 +2690,25 @@ public class NFAITCase extends TestLogger { // behavior (which is the one applied in the case that the pattern graph starts with such a pattern) // of a looping pattern is with relaxed continuity (as in followedBy). - Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new IterativeCondition<Event>() { - private static final long serialVersionUID = 6215754202506583964L; - - @Override - public boolean filter(Event value, Context<Event> ctx) throws Exception { - if (!value.getName().equals("start")) { - return false; - } - - double sum = 0.0; - for (Event event : ctx.getEventsForPattern("start")) { - sum += event.getPrice(); - } - sum += value.getPrice(); - return Double.compare(sum, 5.0) < 0; - } - }).zeroOrMore(eager).followedBy("end").where(new SimpleCondition<Event>() { - private static final long serialVersionUID = 7056763917392056548L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("end"); - } - }); + Pattern<Event, ?> pattern = eager + ? Pattern.<Event>begin("start").where(new MyEventIterCondition()).oneOrMore().optional() + .followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }) + : Pattern.<Event>begin("start").where(new MyEventIterCondition()).oneOrMore().allowCombinations().optional() + .followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); @@ -2427,6 +2717,25 @@ public class NFAITCase extends TestLogger { return resultingPatterns; } + private static class MyEventIterCondition extends IterativeCondition<Event> { + + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(Event value, Context<Event> ctx) throws Exception { + if (!value.getName().equals("start")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("start")) { + sum += event.getPrice(); + } + sum += value.getPrice(); + return Double.compare(sum, 5.0) < 0; + } + } + @Test public void testIterativeWithPrevPatternDependency() { List<StreamRecord<Event>> inputEvents = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/b3de48d0/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java index 80b1bcb..da1f9c8 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java @@ -153,7 +153,7 @@ public class NFACompilerTest extends TestLogger { public void testNFACompilerWithKleeneStar() { Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(startFilter) - .followedBy("middle").subtype(SubEvent.class).zeroOrMore() + .followedBy("middle").subtype(SubEvent.class).oneOrMore().optional() .followedBy("end").where(endFilter); NFA<Event> nfa = NFACompiler.compile(pattern, serializer, false); http://git-wip-us.apache.org/repos/asf/flink/blob/b3de48d0/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java index e9aa7d2..8c4304a 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java @@ -196,7 +196,7 @@ public class PatternTest extends TestLogger { public boolean filter(Object value) throws Exception { return true; } - }).oneOrMore().zeroOrMore(); + }).oneOrMore().oneOrMore().optional(); } @Test(expected = MalformedPatternException.class) @@ -209,7 +209,7 @@ public class PatternTest extends TestLogger { public boolean filter(Object value) throws Exception { return true; } - }).zeroOrMore().times(1); + }).oneOrMore().optional().times(1); } @Test(expected = MalformedPatternException.class) @@ -235,7 +235,7 @@ public class PatternTest extends TestLogger { public boolean filter(Object value) throws Exception { return true; } - }).oneOrMore().oneOrMore(true); + }).oneOrMore().oneOrMore(); } @Test(expected = MalformedPatternException.class) @@ -248,6 +248,6 @@ public class PatternTest extends TestLogger { public boolean filter(Object value) throws Exception { return true; } - }).oneOrMore(true).zeroOrMore(true); + }).oneOrMore().oneOrMore().optional(); } }
