comments
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad21a441 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad21a441 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad21a441 Branch: refs/heads/table-retraction Commit: ad21a441434b9ac5886b664871553bf57885e984 Parents: 7fbdc10 Author: kl0u <kklou...@gmail.com> Authored: Tue Mar 28 10:46:49 2017 +0200 Committer: kl0u <kklou...@gmail.com> Committed: Tue Mar 28 11:08:04 2017 +0200 ---------------------------------------------------------------------- docs/dev/libs/cep.md | 15 +++------------ .../org/apache/flink/cep/scala/pattern/Pattern.scala | 15 +++++++++++++++ .../src/main/java/org/apache/flink/cep/nfa/NFA.java | 6 +++--- .../cep/pattern/conditions/IterativeCondition.java | 2 +- 4 files changed, 22 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ad21a441/docs/dev/libs/cep.md ---------------------------------------------------------------------- diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md index 9d4ca91..932ba30 100644 --- a/docs/dev/libs/cep.md +++ b/docs/dev/libs/cep.md @@ -146,11 +146,10 @@ start.where(new IterativeCondition<SubEvent>() { return false; } - double sum = 0.0; + double sum = value.getPrice(); for (Event event : ctx.getEventsForPattern("middle")) { sum += event.getPrice(); } - sum += value.getPrice(); return Double.compare(sum, 5.0) < 0; } }); @@ -161,16 +160,8 @@ start.where(new IterativeCondition<SubEvent>() { {% highlight scala %} start.where( (value, ctx) => { - var res = value.getName.startsWith("foo") - if (res) { - var sum = 0.0 - for (e: Event <- ctx.getEventsForPattern("middle")) { - sum += e.getPrice - } - sum += value.getPrice - res = res && sum < 5.0 - } - res + lazy val sum = ctx.getEventsForPattern("middle").asScala.map(_.getPrice).sum + value.getName.startsWith("foo") && sum + value.getPrice < 5.0 } ) {% endhighlight %} http://git-wip-us.apache.org/repos/asf/flink/blob/ad21a441/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala index a1db460..07dfc5a 100644 --- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala @@ -145,6 +145,21 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) { } /** + * Specifies a filter condition which is ORed with an existing filter function. + * + * @param filterFun Or filter function + * @return The same pattern operator where the new filter condition is set + */ + def or(filterFun: (F, Context[F]) => Boolean): Pattern[T, F] = { + val filter = new IterativeCondition[F] { + val cleanFilter = cep.scala.cleanClosure(filterFun) + + override def filter(value: F, ctx: Context[F]): Boolean = cleanFilter(value, ctx) + } + or(filter) + } + + /** * Specifies a filter condition which has to be fulfilled by an event in order to be matched. * * @param filterFun Filter condition http://git-wip-us.apache.org/repos/asf/flink/blob/ad21a441/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index cddc1ed..98c1fc9 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -92,7 +92,7 @@ public class NFA<T> implements Serializable { private final NonDuplicatingTypeSerializer<T> nonDuplicatingTypeSerializer; /** - * Used only for backward compatibility. Buffer used to store the matched events. + * Used only for backwards compatibility. Buffer used to store the matched events. */ private final SharedBuffer<State<T>, T> sharedBuffer = null; @@ -575,7 +575,7 @@ public class NFA<T> implements Serializable { computationState.getVersion()); // for a given computation state, we cannot have more than one matching patterns. - Preconditions.checkArgument(paths.size() <= 1); + Preconditions.checkState(paths.size() <= 1); TypeSerializer<T> serializer = nonDuplicatingTypeSerializer.getTypeSerializer(); @@ -609,7 +609,7 @@ public class NFA<T> implements Serializable { computationState.getVersion()); // for a given computation state, we cannot have more than one matching patterns. - Preconditions.checkArgument(paths.size() <= 1); + Preconditions.checkState(paths.size() <= 1); List<Map<String, T>> result = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/ad21a441/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java index f225e01..016cdef 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java @@ -88,7 +88,7 @@ public abstract class IterativeCondition<T> implements Function, Serializable { /** * @return An {@link Iterable} over the already accepted elements - * for a given pattern. Elements are iterated in the order the were + * for a given pattern. Elements are iterated in the order they were * inserted in the pattern. * * @param name The name of the pattern.