comments

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad21a441
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad21a441
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad21a441

Branch: refs/heads/table-retraction
Commit: ad21a441434b9ac5886b664871553bf57885e984
Parents: 7fbdc10
Author: kl0u <kklou...@gmail.com>
Authored: Tue Mar 28 10:46:49 2017 +0200
Committer: kl0u <kklou...@gmail.com>
Committed: Tue Mar 28 11:08:04 2017 +0200

----------------------------------------------------------------------
 docs/dev/libs/cep.md                                 | 15 +++------------
 .../org/apache/flink/cep/scala/pattern/Pattern.scala | 15 +++++++++++++++
 .../src/main/java/org/apache/flink/cep/nfa/NFA.java  |  6 +++---
 .../cep/pattern/conditions/IterativeCondition.java   |  2 +-
 4 files changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ad21a441/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 9d4ca91..932ba30 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -146,11 +146,10 @@ start.where(new IterativeCondition<SubEvent>() {
             return false;
         }
         
-        double sum = 0.0;
+        double sum = value.getPrice();
         for (Event event : ctx.getEventsForPattern("middle")) {
             sum += event.getPrice();
         }
-        sum += value.getPrice();
         return Double.compare(sum, 5.0) < 0;
     }
 });
@@ -161,16 +160,8 @@ start.where(new IterativeCondition<SubEvent>() {
 {% highlight scala %}
 start.where(
     (value, ctx) => {
-        var res = value.getName.startsWith("foo")
-        if (res) {
-            var sum = 0.0
-            for (e: Event <- ctx.getEventsForPattern("middle")) {
-                sum += e.getPrice
-            }
-            sum += value.getPrice
-            res = res && sum < 5.0
-        }
-        res
+        lazy val sum = 
ctx.getEventsForPattern("middle").asScala.map(_.getPrice).sum
+        value.getName.startsWith("foo") && sum + value.getPrice < 5.0
     }
 )
 {% endhighlight %}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad21a441/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index a1db460..07dfc5a 100644
--- 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -145,6 +145,21 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
   }
 
   /**
+    * Specifies a filter condition which is ORed with an existing filter 
function.
+    *
+    * @param filterFun Or filter function
+    * @return The same pattern operator where the new filter condition is set
+    */
+  def or(filterFun: (F, Context[F]) => Boolean): Pattern[T, F] = {
+    val filter = new IterativeCondition[F] {
+      val cleanFilter = cep.scala.cleanClosure(filterFun)
+
+      override def filter(value: F, ctx: Context[F]): Boolean = 
cleanFilter(value, ctx)
+    }
+    or(filter)
+  }
+
+  /**
     * Specifies a filter condition which has to be fulfilled by an event in 
order to be matched.
     *
     * @param filterFun Filter condition

http://git-wip-us.apache.org/repos/asf/flink/blob/ad21a441/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index cddc1ed..98c1fc9 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -92,7 +92,7 @@ public class NFA<T> implements Serializable {
        private final NonDuplicatingTypeSerializer<T> 
nonDuplicatingTypeSerializer;
 
        /**
-        *      Used only for backward compatibility. Buffer used to store the 
matched events.
+        * Used only for backwards compatibility. Buffer used to store the 
matched events.
         */
        private final SharedBuffer<State<T>, T> sharedBuffer = null;
 
@@ -575,7 +575,7 @@ public class NFA<T> implements Serializable {
                                computationState.getVersion());
 
                // for a given computation state, we cannot have more than one 
matching patterns.
-               Preconditions.checkArgument(paths.size() <= 1);
+               Preconditions.checkState(paths.size() <= 1);
 
                TypeSerializer<T> serializer = 
nonDuplicatingTypeSerializer.getTypeSerializer();
 
@@ -609,7 +609,7 @@ public class NFA<T> implements Serializable {
                        computationState.getVersion());
 
                // for a given computation state, we cannot have more than one 
matching patterns.
-               Preconditions.checkArgument(paths.size() <= 1);
+               Preconditions.checkState(paths.size() <= 1);
 
                List<Map<String, T>> result = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad21a441/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
index f225e01..016cdef 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
@@ -88,7 +88,7 @@ public abstract class IterativeCondition<T> implements 
Function, Serializable {
 
                /**
                 * @return An {@link Iterable} over the already accepted 
elements
-                * for a given pattern. Elements are iterated in the order the 
were
+                * for a given pattern. Elements are iterated in the order they 
were
                 * inserted in the pattern.
                 *
                 * @param name The name of the pattern.

Reply via email to