Repository: flink
Updated Branches:
  refs/heads/master 97f178894 -> e5057b72c


[FLINK-6032] [cep] Clean-up operator state when not needed.

The CEP operator now cleans the registered state for a
key. This happens:
1) for the priority queue, when the queue is empty.
2) for the NFA, when its shared buffer is empty.
3) finally the key is removed from the watermark
   callback service if both the above are empty.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5057b72
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5057b72
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5057b72

Branch: refs/heads/master
Commit: e5057b72c0749a75578665c4c86a47be33382b4a
Parents: 97f1788
Author: kl0u <kklou...@gmail.com>
Authored: Mon Mar 13 20:36:57 2017 +0100
Committer: kl0u <kklou...@gmail.com>
Committed: Thu Mar 16 11:36:47 2017 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  93 +++--
 .../AbstractKeyedCEPPatternOperator.java        | 117 ++++--
 .../java/org/apache/flink/cep/CEPITCase.java    |   2 +-
 .../java/org/apache/flink/cep/SubEvent.java     |  12 +
 .../flink/cep/operator/CEPOperatorTest.java     | 407 ++++++++++++-------
 5 files changed, 412 insertions(+), 219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e5057b72/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 0ff496f..8d87fd8 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.windowing.time.Time;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -54,52 +55,82 @@ import java.util.regex.Pattern;
 /**
  * Non-deterministic finite automaton implementation.
  * <p>
- * The NFA processes input events which will chnage the internal state 
machine. Whenever a final
- * state is reached, the matching sequence of events is emitted.
+ * The {@link org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator 
CEP operator}
+ * keeps one NFA per key, for keyed input streams, and a single global NFA for 
non-keyed ones.
+ * When an event gets processed, it updates the NFA's internal state machine.
+ * <p>
+ * An event that belongs to a partially matched sequence is kept in an internal
+ * {@link SharedBuffer buffer}, which is a memory-optimized data-structure 
exactly for
+ * this purpose. Events in the buffer are removed when all the matched 
sequences that
+ * contain them are:
+ * <ol>
+ *     <li>emitted (success)</li>
+ *     <li>discarded (patterns containing NOT)</li>
+ *     <li>timed-out (windowed patterns)</li>
+ * </ol>
  *
  * The implementation is strongly based on the paper "Efficient Pattern 
Matching over Event Streams".
  *
- * @see <a 
href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf";>https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a>
+ * @see <a 
href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf";>
+ *     https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a>
  *
  * @param <T> Type of the processed events
  */
 public class NFA<T> implements Serializable {
 
-       private static final Pattern namePattern = 
Pattern.compile("^(.*\\[)(\\])$");
        private static final long serialVersionUID = 2957674889294717265L;
 
+       private static final Pattern namePattern = 
Pattern.compile("^(.*\\[)(\\])$");
+
        private final NonDuplicatingTypeSerializer<T> 
nonDuplicatingTypeSerializer;
 
-       // Buffer used to store the matched events
+       /**
+        *      Buffer used to store the matched events.
+        */
        private final SharedBuffer<State<T>, T> sharedBuffer;
 
-       // Set of all NFA states
+       /**
+        * A set of all the valid NFA states, as returned by the
+        * {@link org.apache.flink.cep.nfa.compiler.NFACompiler NFACompiler}.
+        * These are directly derived from the user-specified pattern.
+        */
        private final Set<State<T>> states;
 
-       // Length of the window
+       /**
+        * The length of a windowed pattern, as specified using the
+        * {@link org.apache.flink.cep.pattern.Pattern#within(Time) 
Pattern.within(Time)}
+        * method.
+        */
        private final long windowTime;
 
+       /**
+        * A flag indicating if we want timed-out patterns (in case of windowed 
patterns)
+        * to be emitted ({@code true}), or silently discarded ({@code false}).
+        */
        private final boolean handleTimeout;
 
        // Current starting index for the next dewey version number
        private int startEventCounter;
 
-       // Current set of computation states within the state machine
+       /**
+        * Current set of {@link ComputationState computation states} within 
the state machine.
+        * These are the "active" intermediate states that are waiting for new 
matching
+        * events to transition to new valid states.
+        */
        private transient Queue<ComputationState<T>> computationStates;
 
        public NFA(
-               final TypeSerializer<T> eventSerializer,
-               final long windowTime,
-               final boolean handleTimeout) {
+                       final TypeSerializer<T> eventSerializer,
+                       final long windowTime,
+                       final boolean handleTimeout) {
 
                this.nonDuplicatingTypeSerializer = new 
NonDuplicatingTypeSerializer<>(eventSerializer);
                this.windowTime = windowTime;
                this.handleTimeout = handleTimeout;
-               sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
-               computationStates = new LinkedList<>();
-
-               states = new HashSet<>();
-               startEventCounter = 1;
+               this.sharedBuffer = new 
SharedBuffer<>(nonDuplicatingTypeSerializer);
+               this.computationStates = new LinkedList<>();
+               this.states = new HashSet<>();
+               this.startEventCounter = 1;
        }
 
        public Set<State<T>> getStates() {
@@ -121,6 +152,17 @@ public class NFA<T> implements Serializable {
        }
 
        /**
+        * Check if the NFA has finished processing all incoming data so far. 
That is
+        * when the buffer keeping the matches is empty.
+        *
+        * @return {@code true} if there are no elements in the {@link 
SharedBuffer},
+        * {@code false} otherwise.
+        */
+       public boolean isEmpty() {
+               return sharedBuffer.isEmpty();
+       }
+
+       /**
         * Processes the next input event. If some of the computations reach a 
final state then the
         * resulting event sequences are returned. If computations time out and 
timeout handling is
         * activated, then the timed out event patterns are returned.
@@ -186,15 +228,13 @@ public class NFA<T> implements Serializable {
                if(windowTime > 0L) {
                        long pruningTimestamp = timestamp - windowTime;
 
-                       // sanity check to guard against underflows
-                       if (pruningTimestamp >= timestamp) {
-                               throw new IllegalStateException("Detected an 
underflow in the pruning timestamp. This indicates that" +
-                                       " either the window length is too long 
(" + windowTime + ") or that the timestamp has not been" +
-                                       " set correctly (e.g. 
Long.MIN_VALUE).");
-                       }
+                       if (pruningTimestamp < timestamp) {
+                               // the check is to guard against underflows
 
-                       // remove all elements which are expired with respect 
to the window length
-                       sharedBuffer.prune(pruningTimestamp);
+                               // remove all elements which are expired
+                               // with respect to the window length
+                               sharedBuffer.prune(pruningTimestamp);
+                       }
                }
 
                return Tuple2.of(result, timeoutResult);
@@ -251,7 +291,7 @@ public class NFA<T> implements Serializable {
                        final T event,
                        final long timestamp) {
                Stack<State<T>> states = new Stack<>();
-               ArrayList<ComputationState<T>> resultingComputationStates = new 
ArrayList<>();
+               List<ComputationState<T>> resultingComputationStates = new 
ArrayList<>();
                State<T> state = computationState.getState();
 
                states.push(state);
@@ -381,7 +421,7 @@ public class NFA<T> implements Serializable {
                        computationState.getTimestamp(),
                        computationState.getVersion());
 
-               ArrayList<Map<String, T>> result = new ArrayList<>();
+               List<Map<String, T>> result = new ArrayList<>();
 
                TypeSerializer<T> serializer = 
nonDuplicatingTypeSerializer.getTypeSerializer();
 
@@ -497,6 +537,7 @@ public class NFA<T> implements Serializable {
         * {@link TypeSerializer} for {@link NFA} that uses Java Serialization.
         */
        public static class Serializer<T> extends TypeSerializer<NFA<T>> {
+
                private static final long serialVersionUID = 1L;
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e5057b72/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index f534bec..de7daea 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.cep.operator;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -140,77 +141,58 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
        public void open() throws Exception {
                super.open();
 
-               InternalWatermarkCallbackService<KEY> watermarkCallbackService 
= getInternalWatermarkCallbackService();
+               final InternalWatermarkCallbackService<KEY> 
watermarkCallbackService = getInternalWatermarkCallbackService();
 
                watermarkCallbackService.setWatermarkCallback(
                        new OnWatermarkCallback<KEY>() {
 
                                @Override
                                public void onWatermark(KEY key, Watermark 
watermark) throws IOException {
-                                       setCurrentKey(key);
 
+                                       // 1) get the queue of pending elements 
for the key and the corresponding NFA,
+                                       // 2) process the pending elements in 
event time order by feeding them in the NFA
+                                       // 3) advance the time to the current 
watermark, so that expired patterns are discarded.
+                                       // 4) update the stored state for the 
key, by only storing the new NFA and priority queue iff they
+                                       //              have state to be used 
later.
+
+                                       // STEP 1
                                        PriorityQueue<StreamRecord<IN>> 
priorityQueue = getPriorityQueue();
                                        NFA<IN> nfa = getNFA();
 
-                                       if (priorityQueue.isEmpty()) {
-                                               advanceTime(nfa, 
watermark.getTimestamp());
-                                       } else {
-                                               while (!priorityQueue.isEmpty() 
&& priorityQueue.peek().getTimestamp() <= watermark.getTimestamp()) {
-                                                       StreamRecord<IN> 
streamRecord = priorityQueue.poll();
-                                                       processEvent(nfa, 
streamRecord.getValue(), streamRecord.getTimestamp());
-                                               }
+                                       // STEP 2
+                                       while (!priorityQueue.isEmpty() && 
priorityQueue.peek().getTimestamp() <= watermark.getTimestamp()) {
+                                               StreamRecord<IN> streamRecord = 
priorityQueue.poll();
+                                               processEvent(nfa, 
streamRecord.getValue(), streamRecord.getTimestamp());
                                        }
 
-                                       updateNFA(nfa);
+                                       // STEP 3
+                                       advanceTime(nfa, 
watermark.getTimestamp());
+
+                                       // STEP 4
                                        updatePriorityQueue(priorityQueue);
+                                       updateNFA(nfa);
+
+                                       if (priorityQueue.isEmpty() && 
nfa.isEmpty()) {
+                                               
watermarkCallbackService.unregisterKeyFromWatermarkCallback(key);
+                                       }
                                }
                        },
                        keySerializer
                );
        }
 
-       private NFA<IN> getNFA() throws IOException {
-               NFA<IN> nfa = nfaOperatorState.value();
-
-               if (nfa == null) {
-                       nfa = nfaFactory.createNFA();
-
-                       nfaOperatorState.update(nfa);
-               }
-
-               return nfa;
-       }
-
-       private void updateNFA(NFA<IN> nfa) throws IOException {
-               nfaOperatorState.update(nfa);
-       }
-
-       private PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws 
IOException {
-               PriorityQueue<StreamRecord<IN>> priorityQueue = 
priorityQueueOperatorState.value();
-
-               if (priorityQueue == null) {
-                       priorityQueue = 
priorityQueueFactory.createPriorityQueue();
-
-                       priorityQueueOperatorState.update(priorityQueue);
-               }
-
-               return priorityQueue;
-       }
-
-       private void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) 
throws IOException {
-               priorityQueueOperatorState.update(queue);
-       }
-
        @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
-               
getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue()));
 
                if (isProcessingTime) {
                        // there can be no out of order elements in processing 
time
                        NFA<IN> nfa = getNFA();
-                       processEvent(nfa, element.getValue(), 
System.currentTimeMillis());
+                       processEvent(nfa, element.getValue(), 
getProcessingTimeService().getCurrentProcessingTime());
                        updateNFA(nfa);
+
                } else {
+                       
getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue()));
+
                        PriorityQueue<StreamRecord<IN>> priorityQueue = 
getPriorityQueue();
 
                        // event time processing
@@ -225,6 +207,32 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                }
        }
 
+       private NFA<IN> getNFA() throws IOException {
+               NFA<IN> nfa = nfaOperatorState.value();
+               return nfa != null ? nfa : nfaFactory.createNFA();
+       }
+
+       private void updateNFA(NFA<IN> nfa) throws IOException {
+               if (nfa.isEmpty()) {
+                       nfaOperatorState.clear();
+               } else {
+                       nfaOperatorState.update(nfa);
+               }
+       }
+
+       private PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws 
IOException {
+               PriorityQueue<StreamRecord<IN>> priorityQueue = 
priorityQueueOperatorState.value();
+               return priorityQueue != null ? priorityQueue : 
priorityQueueFactory.createPriorityQueue();
+       }
+
+       private void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) 
throws IOException {
+               if (queue.isEmpty()) {
+                       priorityQueueOperatorState.clear();
+               } else {
+                       priorityQueueOperatorState.update(queue);
+               }
+       }
+
        /**
         * Process the given event by giving it to the NFA and outputting the 
produced set of matched
         * event sequences.
@@ -385,4 +393,25 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                        return getClass().hashCode();
                }
        }
+
+       //////////////////////                  Testing Methods                 
//////////////////////
+
+       @VisibleForTesting
+       public boolean hasNonEmptyNFA(KEY key) throws IOException {
+               setCurrentKey(key);
+               return nfaOperatorState.value() != null;
+       }
+
+       @VisibleForTesting
+       public boolean hasNonEmptyPQ(KEY key) throws IOException {
+               setCurrentKey(key);
+               return priorityQueueOperatorState.value() != null;
+       }
+
+       @VisibleForTesting
+       public int getPQSize(KEY key) throws IOException {
+               setCurrentKey(key);
+               PriorityQueue<StreamRecord<IN>> pq = getPriorityQueue();
+               return pq == null ? -1 : pq.size();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e5057b72/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 0f1f845..5887017 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -501,7 +501,7 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
                result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
 
                // the expected sequences of matching event ids
-               expected = "Left(1.0)\nRight(2.0,2.0,2.0)";
+               expected = 
"Left(1.0)\nLeft(2.0)\nLeft(2.0)\nRight(2.0,2.0,2.0)";
 
                env.execute();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e5057b72/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java
index 31eff28..effb382 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java
@@ -31,6 +31,18 @@ public class SubEvent extends Event {
        }
 
        @Override
+       public boolean equals(Object obj) {
+               return obj instanceof SubEvent &&
+                               super.equals(obj) &&
+                               ((SubEvent) obj).volume == volume;
+       }
+
+       @Override
+       public int hashCode() {
+               return super.hashCode() + (int) volume;
+       }
+
+       @Override
        public String toString() {
                StringBuilder builder = new StringBuilder();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e5057b72/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 1899cb4..4ae74b9 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -49,7 +49,6 @@ import static org.junit.Assert.*;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 public class CEPOperatorTest extends TestLogger {
 
@@ -58,24 +57,8 @@ public class CEPOperatorTest extends TestLogger {
 
        @Test
        public void testKeyedCEPOperatorWatermarkForwarding() throws Exception {
-               KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
-                       private static final long serialVersionUID = 
-4873366487571254798L;
-
-                       @Override
-                       public Integer getKey(Event value) throws Exception {
-                               return value.getId();
-                       }
-               };
-
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness = new KeyedOneInputStreamOperatorTestHarness<>(
-                               new KeyedCEPPatternOperator<>(
-                                       Event.createTypeSerializer(),
-                                       false,
-                                       keySelector,
-                                       IntSerializer.INSTANCE,
-                                       new NFAFactory()),
-                               keySelector,
-                               BasicTypeInfo.INT_TYPE_INFO);
+
+               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness = getCepTestHarness(false);
 
                harness.open();
 
@@ -83,10 +66,7 @@ public class CEPOperatorTest extends TestLogger {
 
                harness.processWatermark(expectedWatermark);
 
-               Object watermark = harness.getOutput().poll();
-
-               assertTrue(watermark instanceof Watermark);
-               assertEquals(expectedWatermark, watermark);
+               verifyWatermark(harness.getOutput().poll(), 42L);
 
                harness.close();
        }
@@ -94,24 +74,7 @@ public class CEPOperatorTest extends TestLogger {
        @Test
        public void testKeyedCEPOperatorCheckpointing() throws Exception {
 
-               KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
-                       private static final long serialVersionUID = 
-4873366487571254798L;
-
-                       @Override
-                       public Integer getKey(Event value) throws Exception {
-                               return value.getId();
-                       }
-               };
-
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness = new KeyedOneInputStreamOperatorTestHarness<>(
-                               new KeyedCEPPatternOperator<>(
-                                               Event.createTypeSerializer(),
-                                               false,
-                                               keySelector,
-                                               IntSerializer.INSTANCE,
-                                               new NFAFactory()),
-                               keySelector,
-                               BasicTypeInfo.INT_TYPE_INFO);
+               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness = getCepTestHarness(false);
 
                harness.open();
 
@@ -119,22 +82,14 @@ public class CEPOperatorTest extends TestLogger {
                SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
                Event endEvent=  new Event(42, "end", 1.0);
 
-               harness.processElement(new StreamRecord<Event>(startEvent, 1));
-               harness.processElement(new StreamRecord<Event>(new Event(42, 
"foobar", 1.0), 2));
+               harness.processElement(new StreamRecord<>(startEvent, 1L));
+               harness.processElement(new StreamRecord<>(new Event(42, 
"foobar", 1.0), 2L));
 
                // simulate snapshot/restore with some elements in internal 
sorting queue
-               OperatorStateHandles snapshot = harness.snapshot(0, 0);
+               OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
                harness.close();
 
-               harness = new KeyedOneInputStreamOperatorTestHarness<>(
-                               new KeyedCEPPatternOperator<>(
-                                               Event.createTypeSerializer(),
-                                               false,
-                                               keySelector,
-                                               IntSerializer.INSTANCE,
-                                               new NFAFactory()),
-                               keySelector,
-                               BasicTypeInfo.INT_TYPE_INFO);
+               harness = getCepTestHarness(false);
 
                harness.setup();
                harness.initializeState(snapshot);
@@ -142,29 +97,21 @@ public class CEPOperatorTest extends TestLogger {
 
                harness.processWatermark(new Watermark(Long.MIN_VALUE));
 
-               harness.processElement(new StreamRecord<Event>(new SubEvent(42, 
"barfoo", 1.0, 5.0), 3));
+               harness.processElement(new StreamRecord<Event>(new SubEvent(42, 
"barfoo", 1.0, 5.0), 3L));
 
                // if element timestamps are not correctly 
checkpointed/restored this will lead to
                // a pruning time underflow exception in NFA
-               harness.processWatermark(new Watermark(2));
+               harness.processWatermark(new Watermark(2L));
 
-               harness.processElement(new StreamRecord<Event>(middleEvent, 3));
-               harness.processElement(new StreamRecord<Event>(new Event(42, 
"start", 1.0), 4));
-               harness.processElement(new StreamRecord<Event>(endEvent, 5));
+               harness.processElement(new StreamRecord<Event>(middleEvent, 
3L));
+               harness.processElement(new StreamRecord<>(new Event(42, 
"start", 1.0), 4L));
+               harness.processElement(new StreamRecord<>(endEvent, 5L));
 
                // simulate snapshot/restore with empty element queue but NFA 
state
-               OperatorStateHandles snapshot2 = harness.snapshot(1, 1);
+               OperatorStateHandles snapshot2 = harness.snapshot(1L, 1L);
                harness.close();
 
-               harness = new KeyedOneInputStreamOperatorTestHarness<>(
-                               new KeyedCEPPatternOperator<>(
-                                               Event.createTypeSerializer(),
-                                               false,
-                                               keySelector,
-                                               IntSerializer.INSTANCE,
-                                               new NFAFactory()),
-                               keySelector,
-                               BasicTypeInfo.INT_TYPE_INFO);
+               harness = getCepTestHarness(false);
 
                harness.setup();
                harness.initializeState(snapshot2);
@@ -172,22 +119,14 @@ public class CEPOperatorTest extends TestLogger {
 
                harness.processWatermark(new Watermark(Long.MAX_VALUE));
 
-               ConcurrentLinkedQueue<Object> result = harness.getOutput();
+               // get and verify the output
 
-               // watermark and the result
-               assertEquals(2, result.size());
-
-               Object resultObject = result.poll();
-               assertTrue(resultObject instanceof StreamRecord);
-               StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
-               assertTrue(resultRecord.getValue() instanceof Map);
+               Queue<Object> result = harness.getOutput();
 
-               @SuppressWarnings("unchecked")
-               Map<String, Event> patternMap = (Map<String, Event>) 
resultRecord.getValue();
+               assertEquals(2, result.size());
 
-               assertEquals(startEvent, patternMap.get("start"));
-               assertEquals(middleEvent, patternMap.get("middle"));
-               assertEquals(endEvent, patternMap.get("end"));
+               verifyPattern(result.poll(), startEvent, middleEvent, endEvent);
+               verifyWatermark(result.poll(), Long.MAX_VALUE);
 
                harness.close();
        }
@@ -199,24 +138,7 @@ public class CEPOperatorTest extends TestLogger {
                RocksDBStateBackend rocksDBStateBackend = new 
RocksDBStateBackend(new MemoryStateBackend());
                rocksDBStateBackend.setDbStoragePath(rocksDbPath);
 
-               KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
-                       private static final long serialVersionUID = 
-4873366487571254798L;
-
-                       @Override
-                       public Integer getKey(Event value) throws Exception {
-                               return value.getId();
-                       }
-               };
-
-               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness = new KeyedOneInputStreamOperatorTestHarness<>(
-                               new KeyedCEPPatternOperator<>(
-                                               Event.createTypeSerializer(),
-                                               false,
-                                               keySelector,
-                                               IntSerializer.INSTANCE,
-                                               new NFAFactory()),
-                               keySelector,
-                               BasicTypeInfo.INT_TYPE_INFO);
+               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness = getCepTestHarness(false);
 
                harness.setStateBackend(rocksDBStateBackend);
 
@@ -226,22 +148,14 @@ public class CEPOperatorTest extends TestLogger {
                SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
                Event endEvent=  new Event(42, "end", 1.0);
 
-               harness.processElement(new StreamRecord<Event>(startEvent, 1));
-               harness.processElement(new StreamRecord<Event>(new Event(42, 
"foobar", 1.0), 2));
+               harness.processElement(new StreamRecord<>(startEvent, 1L));
+               harness.processElement(new StreamRecord<>(new Event(42, 
"foobar", 1.0), 2L));
 
                // simulate snapshot/restore with some elements in internal 
sorting queue
-               OperatorStateHandles snapshot = harness.snapshot(0, 0);
+               OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
                harness.close();
 
-               harness = new KeyedOneInputStreamOperatorTestHarness<>(
-                               new KeyedCEPPatternOperator<>(
-                                               Event.createTypeSerializer(),
-                                               false,
-                                               keySelector,
-                                               IntSerializer.INSTANCE,
-                                               new NFAFactory()),
-                               keySelector,
-                               BasicTypeInfo.INT_TYPE_INFO);
+               harness = getCepTestHarness(false);
 
                rocksDBStateBackend = new RocksDBStateBackend(new 
MemoryStateBackend());
                rocksDBStateBackend.setDbStoragePath(rocksDbPath);
@@ -253,25 +167,17 @@ public class CEPOperatorTest extends TestLogger {
 
                harness.processWatermark(new Watermark(Long.MIN_VALUE));
 
-               harness.processElement(new StreamRecord<Event>(new SubEvent(42, 
"barfoo", 1.0, 5.0), 3));
+               harness.processElement(new StreamRecord<Event>(new SubEvent(42, 
"barfoo", 1.0, 5.0), 3L));
 
                // if element timestamps are not correctly 
checkpointed/restored this will lead to
                // a pruning time underflow exception in NFA
-               harness.processWatermark(new Watermark(2));
+               harness.processWatermark(new Watermark(2L));
 
                // simulate snapshot/restore with empty element queue but NFA 
state
-               OperatorStateHandles snapshot2 = harness.snapshot(1, 1);
+               OperatorStateHandles snapshot2 = harness.snapshot(1L, 1L);
                harness.close();
 
-               harness = new KeyedOneInputStreamOperatorTestHarness<>(
-                               new KeyedCEPPatternOperator<>(
-                                               Event.createTypeSerializer(),
-                                               false,
-                                               keySelector,
-                                               IntSerializer.INSTANCE,
-                                               new NFAFactory()),
-                               keySelector,
-                               BasicTypeInfo.INT_TYPE_INFO);
+               harness = getCepTestHarness(false);
 
                rocksDBStateBackend = new RocksDBStateBackend(new 
MemoryStateBackend());
                rocksDBStateBackend.setDbStoragePath(rocksDbPath);
@@ -280,28 +186,20 @@ public class CEPOperatorTest extends TestLogger {
                harness.initializeState(snapshot2);
                harness.open();
 
-               harness.processElement(new StreamRecord<Event>(middleEvent, 3));
-               harness.processElement(new StreamRecord<Event>(new Event(42, 
"start", 1.0), 4));
-               harness.processElement(new StreamRecord<Event>(endEvent, 5));
+               harness.processElement(new StreamRecord<Event>(middleEvent, 
3L));
+               harness.processElement(new StreamRecord<>(new Event(42, 
"start", 1.0), 4L));
+               harness.processElement(new StreamRecord<>(endEvent, 5L));
 
                harness.processWatermark(new Watermark(Long.MAX_VALUE));
 
-               ConcurrentLinkedQueue<Object> result = harness.getOutput();
+               // get and verify the output
 
-               // watermark and the result
-               assertEquals(2, result.size());
-
-               Object resultObject = result.poll();
-               assertTrue(resultObject instanceof StreamRecord);
-               StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
-               assertTrue(resultRecord.getValue() instanceof Map);
+               Queue<Object> result = harness.getOutput();
 
-               @SuppressWarnings("unchecked")
-               Map<String, Event> patternMap = (Map<String, Event>) 
resultRecord.getValue();
+               assertEquals(2, result.size());
 
-               assertEquals(startEvent, patternMap.get("start"));
-               assertEquals(middleEvent, patternMap.get("middle"));
-               assertEquals(endEvent, patternMap.get("end"));
+               verifyPattern(result.poll(), startEvent, middleEvent, endEvent);
+               verifyWatermark(result.poll(), Long.MAX_VALUE);
 
                harness.close();
        }
@@ -311,14 +209,8 @@ public class CEPOperatorTest extends TestLogger {
         */
        @Test
        public void testKeyedAdvancingTimeWithoutElements() throws Exception {
-               final KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
-                       private static final long serialVersionUID = 
-4873366487571254798L;
-
-                       @Override
-                       public Integer getKey(Event value) throws Exception {
-                               return value.getId();
-                       }
-               };
+               final KeySelector<Event, Integer> keySelector = new 
TestKeySelector();
+
                final Event startEvent = new Event(42, "start", 1.0);
                final long watermarkTimestamp1 = 5L;
                final long watermarkTimestamp2 = 13L;
@@ -349,7 +241,7 @@ public class CEPOperatorTest extends TestLogger {
 
                        Queue<Object> result = harness.getOutput();
 
-                       assertEquals(3, result.size());
+                       assertEquals(3L, result.size());
 
                        Object watermark1 = result.poll();
 
@@ -382,6 +274,225 @@ public class CEPOperatorTest extends TestLogger {
                }
        }
 
+       @Test
+       public void testCEPOperatorCleanupEventTime() throws Exception {
+
+               Event startEvent1 = new Event(42, "start", 1.0);
+               Event startEvent2 = new Event(42, "start", 2.0);
+               SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
+               SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0);
+               SubEvent middleEvent3 = new SubEvent(42, "foo3", 1.0, 10.0);
+               Event endEvent1 =  new Event(42, "end", 1.0);
+               Event endEvent2 =  new Event(42, "end", 2.0);
+
+               Event startEventK2 = new Event(43, "start", 1.0);
+
+               TestKeySelector keySelector = new TestKeySelector();
+               KeyedCEPPatternOperator<Event, Integer> operator = 
getKeyedCepOpearator(false, keySelector);
+               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness = getCepTestHarness(operator);
+
+               harness.open();
+
+               harness.processWatermark(new Watermark(Long.MIN_VALUE));
+
+               harness.processElement(new StreamRecord<>(startEvent1, 1L));
+               harness.processElement(new StreamRecord<>(startEventK2, 1L));
+               harness.processElement(new StreamRecord<>(new Event(42, 
"foobar", 1.0), 2L));
+               harness.processElement(new StreamRecord<Event>(middleEvent1, 
2L));
+               harness.processElement(new StreamRecord<Event>(new SubEvent(42, 
"barfoo", 1.0, 5.0), 3L));
+
+               // there must be 2 keys 42, 43 registered for the watermark 
callback
+               // all the seen elements must be in the priority queues but no 
NFA yet.
+
+               assertEquals(2L, harness.numKeysForWatermarkCallback());
+               assertEquals(4L, operator.getPQSize(42));
+               assertEquals(1L, operator.getPQSize(43));
+               assertTrue(!operator.hasNonEmptyNFA(42));
+               assertTrue(!operator.hasNonEmptyNFA(43));
+
+               harness.processWatermark(new Watermark(2L));
+
+               verifyWatermark(harness.getOutput().poll(), Long.MIN_VALUE);
+               verifyWatermark(harness.getOutput().poll(), 2L);
+
+               // still the 2 keys
+               // one element in PQ for 42 (the barfoo) as it arrived early
+               // for 43 the element entered the NFA and the PQ is empty
+
+               assertEquals(2L, harness.numKeysForWatermarkCallback());
+               assertTrue(operator.hasNonEmptyNFA(42));
+               assertEquals(1L, operator.getPQSize(42));
+               assertTrue(operator.hasNonEmptyNFA(43));
+               assertTrue(!operator.hasNonEmptyPQ(43));
+
+               harness.processElement(new StreamRecord<>(startEvent2, 4L));
+               harness.processElement(new StreamRecord<Event>(middleEvent2, 
5L));
+               harness.processElement(new StreamRecord<>(endEvent1, 6L));
+               harness.processWatermark(11L);
+               harness.processWatermark(12L);
+
+               // now we have 1 key because the 43 expired and was removed.
+               // 42 is still there due to startEvent2
+               assertEquals(1L, harness.numKeysForWatermarkCallback());
+               assertTrue(operator.hasNonEmptyNFA(42));
+               assertTrue(!operator.hasNonEmptyPQ(42));
+               assertTrue(!operator.hasNonEmptyNFA(43));
+               assertTrue(!operator.hasNonEmptyPQ(43));
+
+               verifyPattern(harness.getOutput().poll(), startEvent1, 
middleEvent1, endEvent1);
+               verifyPattern(harness.getOutput().poll(), startEvent1, 
middleEvent2, endEvent1);
+               verifyPattern(harness.getOutput().poll(), startEvent2, 
middleEvent2, endEvent1);
+               verifyWatermark(harness.getOutput().poll(), 11L);
+               verifyWatermark(harness.getOutput().poll(), 12L);
+
+               harness.processElement(new StreamRecord<Event>(middleEvent3, 
12L));
+               harness.processElement(new StreamRecord<>(endEvent2, 13L));
+               harness.processWatermark(20L);
+               harness.processWatermark(21L);
+
+               assertTrue(!operator.hasNonEmptyNFA(42));
+               assertTrue(!operator.hasNonEmptyPQ(42));
+               assertEquals(0L, harness.numKeysForWatermarkCallback());
+
+               verifyPattern(harness.getOutput().poll(), startEvent2, 
middleEvent2, endEvent2);
+               verifyPattern(harness.getOutput().poll(), startEvent2, 
middleEvent3, endEvent2);
+               verifyWatermark(harness.getOutput().poll(), 20L);
+               verifyWatermark(harness.getOutput().poll(), 21L);
+
+               harness.close();
+       }
+
+       @Test
+       public void testCEPOperatorCleanupProcessingTime() throws Exception {
+
+               Event startEvent1 = new Event(42, "start", 1.0);
+               Event startEvent2 = new Event(42, "start", 2.0);
+               SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
+               SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0);
+               SubEvent middleEvent3 = new SubEvent(42, "foo3", 1.0, 10.0);
+               Event endEvent1 =  new Event(42, "end", 1.0);
+               Event endEvent2 =  new Event(42, "end", 2.0);
+
+               Event startEventK2 = new Event(43, "start", 1.0);
+
+               TestKeySelector keySelector = new TestKeySelector();
+               KeyedCEPPatternOperator<Event, Integer> operator = 
getKeyedCepOpearator(true, keySelector);
+               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness = getCepTestHarness(operator);
+
+               harness.open();
+
+               harness.setProcessingTime(0L);
+
+               harness.processElement(new StreamRecord<>(startEvent1, 1L));
+               harness.processElement(new StreamRecord<>(startEventK2, 1L));
+               harness.processElement(new StreamRecord<>(new Event(42, 
"foobar", 1.0), 2L));
+               harness.processElement(new StreamRecord<Event>(middleEvent1, 
2L));
+               harness.processElement(new StreamRecord<Event>(new SubEvent(42, 
"barfoo", 1.0, 5.0), 3L));
+
+               assertTrue(!operator.hasNonEmptyPQ(42));
+               assertTrue(!operator.hasNonEmptyPQ(43));
+               assertTrue(operator.hasNonEmptyNFA(42));
+               assertTrue(operator.hasNonEmptyNFA(43));
+
+               harness.setProcessingTime(3L);
+
+               harness.processElement(new StreamRecord<>(startEvent2, 3L));
+               harness.processElement(new StreamRecord<Event>(middleEvent2, 
4L));
+               harness.processElement(new StreamRecord<>(endEvent1, 5L));
+
+               verifyPattern(harness.getOutput().poll(), startEvent1, 
middleEvent1, endEvent1);
+               verifyPattern(harness.getOutput().poll(), startEvent1, 
middleEvent2, endEvent1);
+               verifyPattern(harness.getOutput().poll(), startEvent2, 
middleEvent2, endEvent1);
+
+               harness.setProcessingTime(11L);
+
+               harness.processElement(new StreamRecord<Event>(middleEvent3, 
11L));
+               harness.processElement(new StreamRecord<>(endEvent2, 12L));
+
+               verifyPattern(harness.getOutput().poll(), startEvent2, 
middleEvent2, endEvent2);
+               verifyPattern(harness.getOutput().poll(), startEvent2, 
middleEvent3, endEvent2);
+
+               harness.setProcessingTime(21L);
+
+               assertTrue(operator.hasNonEmptyNFA(42));
+
+               harness.processElement(new StreamRecord<>(startEvent1, 21L));
+               assertTrue(operator.hasNonEmptyNFA(42));
+
+               harness.setProcessingTime(49L);
+
+               // TODO: 3/13/17 we have to have another event in order to 
clean up
+               harness.processElement(new StreamRecord<>(new Event(42, 
"foobar", 1.0), 2L));
+
+               // the pattern expired
+               assertTrue(!operator.hasNonEmptyNFA(42));
+
+               assertEquals(0L, harness.numKeysForWatermarkCallback());
+               assertTrue(!operator.hasNonEmptyPQ(42));
+               assertTrue(!operator.hasNonEmptyPQ(43));
+
+               harness.close();
+       }
+
+       private void verifyWatermark(Object outputObject, long timestamp) {
+               assertTrue(outputObject instanceof Watermark);
+               assertEquals(timestamp, ((Watermark) 
outputObject).getTimestamp());
+       }
+
+       private void verifyPattern(Object outputObject, Event start, SubEvent 
middle, Event end) {
+               assertTrue(outputObject instanceof StreamRecord);
+
+               StreamRecord<?> resultRecord = (StreamRecord<?>) outputObject;
+               assertTrue(resultRecord.getValue() instanceof Map);
+
+               @SuppressWarnings("unchecked")
+               Map<String, Event> patternMap = (Map<String, Event>) 
resultRecord.getValue();
+               assertEquals(start, patternMap.get("start"));
+               assertEquals(middle, patternMap.get("middle"));
+               assertEquals(end, patternMap.get("end"));
+       }
+
+       private OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
getCepTestHarness(boolean isProcessingTime) throws Exception {
+               KeySelector<Event, Integer> keySelector = new TestKeySelector();
+
+               return new KeyedOneInputStreamOperatorTestHarness<>(
+                       getKeyedCepOpearator(isProcessingTime, keySelector),
+                       keySelector,
+                       BasicTypeInfo.INT_TYPE_INFO);
+       }
+
+       private OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
getCepTestHarness(
+                       KeyedCEPPatternOperator<Event, Integer> cepOperator) 
throws Exception {
+               KeySelector<Event, Integer> keySelector = new TestKeySelector();
+
+               return new KeyedOneInputStreamOperatorTestHarness<>(
+                       cepOperator,
+                       keySelector,
+                       BasicTypeInfo.INT_TYPE_INFO);
+       }
+
+       private KeyedCEPPatternOperator<Event, Integer> getKeyedCepOpearator(
+                       boolean isProcessingTime,
+                       KeySelector<Event, Integer> keySelector) {
+
+               return new KeyedCEPPatternOperator<>(
+                       Event.createTypeSerializer(),
+                       isProcessingTime,
+                       keySelector,
+                       IntSerializer.INSTANCE,
+                       new NFAFactory());
+       }
+
+       private static class TestKeySelector implements KeySelector<Event, 
Integer> {
+
+               private static final long serialVersionUID = 
-4873366487571254798L;
+
+               @Override
+               public Integer getKey(Event value) throws Exception {
+                       return value.getId();
+               }
+       }
+
        private static class NFAFactory implements 
NFACompiler.NFAFactory<Event> {
 
                private static final long serialVersionUID = 
1173020762472766713L;

Reply via email to