Repository: flink Updated Branches: refs/heads/master 97f178894 -> e5057b72c
[FLINK-6032] [cep] Clean-up operator state when not needed. The CEP operator now cleans the registered state for a key. This happens: 1) for the priority queue, when the queue is empty. 2) for the NFA, when its shared buffer is empty. 3) finally the key is removed from the watermark callback service if both the above are empty. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5057b72 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5057b72 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5057b72 Branch: refs/heads/master Commit: e5057b72c0749a75578665c4c86a47be33382b4a Parents: 97f1788 Author: kl0u <kklou...@gmail.com> Authored: Mon Mar 13 20:36:57 2017 +0100 Committer: kl0u <kklou...@gmail.com> Committed: Thu Mar 16 11:36:47 2017 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/flink/cep/nfa/NFA.java | 93 +++-- .../AbstractKeyedCEPPatternOperator.java | 117 ++++-- .../java/org/apache/flink/cep/CEPITCase.java | 2 +- .../java/org/apache/flink/cep/SubEvent.java | 12 + .../flink/cep/operator/CEPOperatorTest.java | 407 ++++++++++++------- 5 files changed, 412 insertions(+), 219 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e5057b72/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index 0ff496f..8d87fd8 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -28,6 +28,7 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.api.windowing.time.Time; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -54,52 +55,82 @@ import java.util.regex.Pattern; /** * Non-deterministic finite automaton implementation. * <p> - * The NFA processes input events which will chnage the internal state machine. Whenever a final - * state is reached, the matching sequence of events is emitted. + * The {@link org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator CEP operator} + * keeps one NFA per key, for keyed input streams, and a single global NFA for non-keyed ones. + * When an event gets processed, it updates the NFA's internal state machine. + * <p> + * An event that belongs to a partially matched sequence is kept in an internal + * {@link SharedBuffer buffer}, which is a memory-optimized data-structure exactly for + * this purpose. Events in the buffer are removed when all the matched sequences that + * contain them are: + * <ol> + * <li>emitted (success)</li> + * <li>discarded (patterns containing NOT)</li> + * <li>timed-out (windowed patterns)</li> + * </ol> * * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". * - * @see <a href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf">https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a> + * @see <a href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf"> + * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a> * * @param <T> Type of the processed events */ public class NFA<T> implements Serializable { - private static final Pattern namePattern = Pattern.compile("^(.*\\[)(\\])$"); private static final long serialVersionUID = 2957674889294717265L; + private static final Pattern namePattern = Pattern.compile("^(.*\\[)(\\])$"); + private final NonDuplicatingTypeSerializer<T> nonDuplicatingTypeSerializer; - // Buffer used to store the matched events + /** + * Buffer used to store the matched events. + */ private final SharedBuffer<State<T>, T> sharedBuffer; - // Set of all NFA states + /** + * A set of all the valid NFA states, as returned by the + * {@link org.apache.flink.cep.nfa.compiler.NFACompiler NFACompiler}. + * These are directly derived from the user-specified pattern. + */ private final Set<State<T>> states; - // Length of the window + /** + * The length of a windowed pattern, as specified using the + * {@link org.apache.flink.cep.pattern.Pattern#within(Time) Pattern.within(Time)} + * method. + */ private final long windowTime; + /** + * A flag indicating if we want timed-out patterns (in case of windowed patterns) + * to be emitted ({@code true}), or silently discarded ({@code false}). + */ private final boolean handleTimeout; // Current starting index for the next dewey version number private int startEventCounter; - // Current set of computation states within the state machine + /** + * Current set of {@link ComputationState computation states} within the state machine. + * These are the "active" intermediate states that are waiting for new matching + * events to transition to new valid states. + */ private transient Queue<ComputationState<T>> computationStates; public NFA( - final TypeSerializer<T> eventSerializer, - final long windowTime, - final boolean handleTimeout) { + final TypeSerializer<T> eventSerializer, + final long windowTime, + final boolean handleTimeout) { this.nonDuplicatingTypeSerializer = new NonDuplicatingTypeSerializer<>(eventSerializer); this.windowTime = windowTime; this.handleTimeout = handleTimeout; - sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer); - computationStates = new LinkedList<>(); - - states = new HashSet<>(); - startEventCounter = 1; + this.sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer); + this.computationStates = new LinkedList<>(); + this.states = new HashSet<>(); + this.startEventCounter = 1; } public Set<State<T>> getStates() { @@ -121,6 +152,17 @@ public class NFA<T> implements Serializable { } /** + * Check if the NFA has finished processing all incoming data so far. That is + * when the buffer keeping the matches is empty. + * + * @return {@code true} if there are no elements in the {@link SharedBuffer}, + * {@code false} otherwise. + */ + public boolean isEmpty() { + return sharedBuffer.isEmpty(); + } + + /** * Processes the next input event. If some of the computations reach a final state then the * resulting event sequences are returned. If computations time out and timeout handling is * activated, then the timed out event patterns are returned. @@ -186,15 +228,13 @@ public class NFA<T> implements Serializable { if(windowTime > 0L) { long pruningTimestamp = timestamp - windowTime; - // sanity check to guard against underflows - if (pruningTimestamp >= timestamp) { - throw new IllegalStateException("Detected an underflow in the pruning timestamp. This indicates that" + - " either the window length is too long (" + windowTime + ") or that the timestamp has not been" + - " set correctly (e.g. Long.MIN_VALUE)."); - } + if (pruningTimestamp < timestamp) { + // the check is to guard against underflows - // remove all elements which are expired with respect to the window length - sharedBuffer.prune(pruningTimestamp); + // remove all elements which are expired + // with respect to the window length + sharedBuffer.prune(pruningTimestamp); + } } return Tuple2.of(result, timeoutResult); @@ -251,7 +291,7 @@ public class NFA<T> implements Serializable { final T event, final long timestamp) { Stack<State<T>> states = new Stack<>(); - ArrayList<ComputationState<T>> resultingComputationStates = new ArrayList<>(); + List<ComputationState<T>> resultingComputationStates = new ArrayList<>(); State<T> state = computationState.getState(); states.push(state); @@ -381,7 +421,7 @@ public class NFA<T> implements Serializable { computationState.getTimestamp(), computationState.getVersion()); - ArrayList<Map<String, T>> result = new ArrayList<>(); + List<Map<String, T>> result = new ArrayList<>(); TypeSerializer<T> serializer = nonDuplicatingTypeSerializer.getTypeSerializer(); @@ -497,6 +537,7 @@ public class NFA<T> implements Serializable { * {@link TypeSerializer} for {@link NFA} that uses Java Serialization. */ public static class Serializer<T> extends TypeSerializer<NFA<T>> { + private static final long serialVersionUID = 1L; @Override http://git-wip-us.apache.org/repos/asf/flink/blob/e5057b72/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index f534bec..de7daea 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -18,6 +18,7 @@ package org.apache.flink.cep.operator; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -140,77 +141,58 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> public void open() throws Exception { super.open(); - InternalWatermarkCallbackService<KEY> watermarkCallbackService = getInternalWatermarkCallbackService(); + final InternalWatermarkCallbackService<KEY> watermarkCallbackService = getInternalWatermarkCallbackService(); watermarkCallbackService.setWatermarkCallback( new OnWatermarkCallback<KEY>() { @Override public void onWatermark(KEY key, Watermark watermark) throws IOException { - setCurrentKey(key); + // 1) get the queue of pending elements for the key and the corresponding NFA, + // 2) process the pending elements in event time order by feeding them in the NFA + // 3) advance the time to the current watermark, so that expired patterns are discarded. + // 4) update the stored state for the key, by only storing the new NFA and priority queue iff they + // have state to be used later. + + // STEP 1 PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue(); NFA<IN> nfa = getNFA(); - if (priorityQueue.isEmpty()) { - advanceTime(nfa, watermark.getTimestamp()); - } else { - while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= watermark.getTimestamp()) { - StreamRecord<IN> streamRecord = priorityQueue.poll(); - processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp()); - } + // STEP 2 + while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= watermark.getTimestamp()) { + StreamRecord<IN> streamRecord = priorityQueue.poll(); + processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp()); } - updateNFA(nfa); + // STEP 3 + advanceTime(nfa, watermark.getTimestamp()); + + // STEP 4 updatePriorityQueue(priorityQueue); + updateNFA(nfa); + + if (priorityQueue.isEmpty() && nfa.isEmpty()) { + watermarkCallbackService.unregisterKeyFromWatermarkCallback(key); + } } }, keySerializer ); } - private NFA<IN> getNFA() throws IOException { - NFA<IN> nfa = nfaOperatorState.value(); - - if (nfa == null) { - nfa = nfaFactory.createNFA(); - - nfaOperatorState.update(nfa); - } - - return nfa; - } - - private void updateNFA(NFA<IN> nfa) throws IOException { - nfaOperatorState.update(nfa); - } - - private PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException { - PriorityQueue<StreamRecord<IN>> priorityQueue = priorityQueueOperatorState.value(); - - if (priorityQueue == null) { - priorityQueue = priorityQueueFactory.createPriorityQueue(); - - priorityQueueOperatorState.update(priorityQueue); - } - - return priorityQueue; - } - - private void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) throws IOException { - priorityQueueOperatorState.update(queue); - } - @Override public void processElement(StreamRecord<IN> element) throws Exception { - getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue())); if (isProcessingTime) { // there can be no out of order elements in processing time NFA<IN> nfa = getNFA(); - processEvent(nfa, element.getValue(), System.currentTimeMillis()); + processEvent(nfa, element.getValue(), getProcessingTimeService().getCurrentProcessingTime()); updateNFA(nfa); + } else { + getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue())); + PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue(); // event time processing @@ -225,6 +207,32 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> } } + private NFA<IN> getNFA() throws IOException { + NFA<IN> nfa = nfaOperatorState.value(); + return nfa != null ? nfa : nfaFactory.createNFA(); + } + + private void updateNFA(NFA<IN> nfa) throws IOException { + if (nfa.isEmpty()) { + nfaOperatorState.clear(); + } else { + nfaOperatorState.update(nfa); + } + } + + private PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException { + PriorityQueue<StreamRecord<IN>> priorityQueue = priorityQueueOperatorState.value(); + return priorityQueue != null ? priorityQueue : priorityQueueFactory.createPriorityQueue(); + } + + private void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) throws IOException { + if (queue.isEmpty()) { + priorityQueueOperatorState.clear(); + } else { + priorityQueueOperatorState.update(queue); + } + } + /** * Process the given event by giving it to the NFA and outputting the produced set of matched * event sequences. @@ -385,4 +393,25 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> return getClass().hashCode(); } } + + ////////////////////// Testing Methods ////////////////////// + + @VisibleForTesting + public boolean hasNonEmptyNFA(KEY key) throws IOException { + setCurrentKey(key); + return nfaOperatorState.value() != null; + } + + @VisibleForTesting + public boolean hasNonEmptyPQ(KEY key) throws IOException { + setCurrentKey(key); + return priorityQueueOperatorState.value() != null; + } + + @VisibleForTesting + public int getPQSize(KEY key) throws IOException { + setCurrentKey(key); + PriorityQueue<StreamRecord<IN>> pq = getPriorityQueue(); + return pq == null ? -1 : pq.size(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/e5057b72/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java index 0f1f845..5887017 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java @@ -501,7 +501,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); // the expected sequences of matching event ids - expected = "Left(1.0)\nRight(2.0,2.0,2.0)"; + expected = "Left(1.0)\nLeft(2.0)\nLeft(2.0)\nRight(2.0,2.0,2.0)"; env.execute(); } http://git-wip-us.apache.org/repos/asf/flink/blob/e5057b72/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java index 31eff28..effb382 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java @@ -31,6 +31,18 @@ public class SubEvent extends Event { } @Override + public boolean equals(Object obj) { + return obj instanceof SubEvent && + super.equals(obj) && + ((SubEvent) obj).volume == volume; + } + + @Override + public int hashCode() { + return super.hashCode() + (int) volume; + } + + @Override public String toString() { StringBuilder builder = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/flink/blob/e5057b72/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index 1899cb4..4ae74b9 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -49,7 +49,6 @@ import static org.junit.Assert.*; import java.util.HashMap; import java.util.Map; import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; public class CEPOperatorTest extends TestLogger { @@ -58,24 +57,8 @@ public class CEPOperatorTest extends TestLogger { @Test public void testKeyedCEPOperatorWatermarkForwarding() throws Exception { - KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { - private static final long serialVersionUID = -4873366487571254798L; - - @Override - public Integer getKey(Event value) throws Exception { - return value.getId(); - } - }; - - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - keySelector, - IntSerializer.INSTANCE, - new NFAFactory()), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); + + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = getCepTestHarness(false); harness.open(); @@ -83,10 +66,7 @@ public class CEPOperatorTest extends TestLogger { harness.processWatermark(expectedWatermark); - Object watermark = harness.getOutput().poll(); - - assertTrue(watermark instanceof Watermark); - assertEquals(expectedWatermark, watermark); + verifyWatermark(harness.getOutput().poll(), 42L); harness.close(); } @@ -94,24 +74,7 @@ public class CEPOperatorTest extends TestLogger { @Test public void testKeyedCEPOperatorCheckpointing() throws Exception { - KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { - private static final long serialVersionUID = -4873366487571254798L; - - @Override - public Integer getKey(Event value) throws Exception { - return value.getId(); - } - }; - - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - keySelector, - IntSerializer.INSTANCE, - new NFAFactory()), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = getCepTestHarness(false); harness.open(); @@ -119,22 +82,14 @@ public class CEPOperatorTest extends TestLogger { SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); Event endEvent= new Event(42, "end", 1.0); - harness.processElement(new StreamRecord<Event>(startEvent, 1)); - harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2)); + harness.processElement(new StreamRecord<>(startEvent, 1L)); + harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L)); // simulate snapshot/restore with some elements in internal sorting queue - OperatorStateHandles snapshot = harness.snapshot(0, 0); + OperatorStateHandles snapshot = harness.snapshot(0L, 0L); harness.close(); - harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - keySelector, - IntSerializer.INSTANCE, - new NFAFactory()), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); + harness = getCepTestHarness(false); harness.setup(); harness.initializeState(snapshot); @@ -142,29 +97,21 @@ public class CEPOperatorTest extends TestLogger { harness.processWatermark(new Watermark(Long.MIN_VALUE)); - harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); + harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L)); // if element timestamps are not correctly checkpointed/restored this will lead to // a pruning time underflow exception in NFA - harness.processWatermark(new Watermark(2)); + harness.processWatermark(new Watermark(2L)); - harness.processElement(new StreamRecord<Event>(middleEvent, 3)); - harness.processElement(new StreamRecord<Event>(new Event(42, "start", 1.0), 4)); - harness.processElement(new StreamRecord<Event>(endEvent, 5)); + harness.processElement(new StreamRecord<Event>(middleEvent, 3L)); + harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4L)); + harness.processElement(new StreamRecord<>(endEvent, 5L)); // simulate snapshot/restore with empty element queue but NFA state - OperatorStateHandles snapshot2 = harness.snapshot(1, 1); + OperatorStateHandles snapshot2 = harness.snapshot(1L, 1L); harness.close(); - harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - keySelector, - IntSerializer.INSTANCE, - new NFAFactory()), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); + harness = getCepTestHarness(false); harness.setup(); harness.initializeState(snapshot2); @@ -172,22 +119,14 @@ public class CEPOperatorTest extends TestLogger { harness.processWatermark(new Watermark(Long.MAX_VALUE)); - ConcurrentLinkedQueue<Object> result = harness.getOutput(); + // get and verify the output - // watermark and the result - assertEquals(2, result.size()); - - Object resultObject = result.poll(); - assertTrue(resultObject instanceof StreamRecord); - StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject; - assertTrue(resultRecord.getValue() instanceof Map); + Queue<Object> result = harness.getOutput(); - @SuppressWarnings("unchecked") - Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue(); + assertEquals(2, result.size()); - assertEquals(startEvent, patternMap.get("start")); - assertEquals(middleEvent, patternMap.get("middle")); - assertEquals(endEvent, patternMap.get("end")); + verifyPattern(result.poll(), startEvent, middleEvent, endEvent); + verifyWatermark(result.poll(), Long.MAX_VALUE); harness.close(); } @@ -199,24 +138,7 @@ public class CEPOperatorTest extends TestLogger { RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); rocksDBStateBackend.setDbStoragePath(rocksDbPath); - KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { - private static final long serialVersionUID = -4873366487571254798L; - - @Override - public Integer getKey(Event value) throws Exception { - return value.getId(); - } - }; - - OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - keySelector, - IntSerializer.INSTANCE, - new NFAFactory()), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = getCepTestHarness(false); harness.setStateBackend(rocksDBStateBackend); @@ -226,22 +148,14 @@ public class CEPOperatorTest extends TestLogger { SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); Event endEvent= new Event(42, "end", 1.0); - harness.processElement(new StreamRecord<Event>(startEvent, 1)); - harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2)); + harness.processElement(new StreamRecord<>(startEvent, 1L)); + harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L)); // simulate snapshot/restore with some elements in internal sorting queue - OperatorStateHandles snapshot = harness.snapshot(0, 0); + OperatorStateHandles snapshot = harness.snapshot(0L, 0L); harness.close(); - harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - keySelector, - IntSerializer.INSTANCE, - new NFAFactory()), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); + harness = getCepTestHarness(false); rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); rocksDBStateBackend.setDbStoragePath(rocksDbPath); @@ -253,25 +167,17 @@ public class CEPOperatorTest extends TestLogger { harness.processWatermark(new Watermark(Long.MIN_VALUE)); - harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); + harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L)); // if element timestamps are not correctly checkpointed/restored this will lead to // a pruning time underflow exception in NFA - harness.processWatermark(new Watermark(2)); + harness.processWatermark(new Watermark(2L)); // simulate snapshot/restore with empty element queue but NFA state - OperatorStateHandles snapshot2 = harness.snapshot(1, 1); + OperatorStateHandles snapshot2 = harness.snapshot(1L, 1L); harness.close(); - harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - keySelector, - IntSerializer.INSTANCE, - new NFAFactory()), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); + harness = getCepTestHarness(false); rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); rocksDBStateBackend.setDbStoragePath(rocksDbPath); @@ -280,28 +186,20 @@ public class CEPOperatorTest extends TestLogger { harness.initializeState(snapshot2); harness.open(); - harness.processElement(new StreamRecord<Event>(middleEvent, 3)); - harness.processElement(new StreamRecord<Event>(new Event(42, "start", 1.0), 4)); - harness.processElement(new StreamRecord<Event>(endEvent, 5)); + harness.processElement(new StreamRecord<Event>(middleEvent, 3L)); + harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4L)); + harness.processElement(new StreamRecord<>(endEvent, 5L)); harness.processWatermark(new Watermark(Long.MAX_VALUE)); - ConcurrentLinkedQueue<Object> result = harness.getOutput(); + // get and verify the output - // watermark and the result - assertEquals(2, result.size()); - - Object resultObject = result.poll(); - assertTrue(resultObject instanceof StreamRecord); - StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject; - assertTrue(resultRecord.getValue() instanceof Map); + Queue<Object> result = harness.getOutput(); - @SuppressWarnings("unchecked") - Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue(); + assertEquals(2, result.size()); - assertEquals(startEvent, patternMap.get("start")); - assertEquals(middleEvent, patternMap.get("middle")); - assertEquals(endEvent, patternMap.get("end")); + verifyPattern(result.poll(), startEvent, middleEvent, endEvent); + verifyWatermark(result.poll(), Long.MAX_VALUE); harness.close(); } @@ -311,14 +209,8 @@ public class CEPOperatorTest extends TestLogger { */ @Test public void testKeyedAdvancingTimeWithoutElements() throws Exception { - final KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { - private static final long serialVersionUID = -4873366487571254798L; - - @Override - public Integer getKey(Event value) throws Exception { - return value.getId(); - } - }; + final KeySelector<Event, Integer> keySelector = new TestKeySelector(); + final Event startEvent = new Event(42, "start", 1.0); final long watermarkTimestamp1 = 5L; final long watermarkTimestamp2 = 13L; @@ -349,7 +241,7 @@ public class CEPOperatorTest extends TestLogger { Queue<Object> result = harness.getOutput(); - assertEquals(3, result.size()); + assertEquals(3L, result.size()); Object watermark1 = result.poll(); @@ -382,6 +274,225 @@ public class CEPOperatorTest extends TestLogger { } } + @Test + public void testCEPOperatorCleanupEventTime() throws Exception { + + Event startEvent1 = new Event(42, "start", 1.0); + Event startEvent2 = new Event(42, "start", 2.0); + SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); + SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0); + SubEvent middleEvent3 = new SubEvent(42, "foo3", 1.0, 10.0); + Event endEvent1 = new Event(42, "end", 1.0); + Event endEvent2 = new Event(42, "end", 2.0); + + Event startEventK2 = new Event(43, "start", 1.0); + + TestKeySelector keySelector = new TestKeySelector(); + KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOpearator(false, keySelector); + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = getCepTestHarness(operator); + + harness.open(); + + harness.processWatermark(new Watermark(Long.MIN_VALUE)); + + harness.processElement(new StreamRecord<>(startEvent1, 1L)); + harness.processElement(new StreamRecord<>(startEventK2, 1L)); + harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L)); + harness.processElement(new StreamRecord<Event>(middleEvent1, 2L)); + harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L)); + + // there must be 2 keys 42, 43 registered for the watermark callback + // all the seen elements must be in the priority queues but no NFA yet. + + assertEquals(2L, harness.numKeysForWatermarkCallback()); + assertEquals(4L, operator.getPQSize(42)); + assertEquals(1L, operator.getPQSize(43)); + assertTrue(!operator.hasNonEmptyNFA(42)); + assertTrue(!operator.hasNonEmptyNFA(43)); + + harness.processWatermark(new Watermark(2L)); + + verifyWatermark(harness.getOutput().poll(), Long.MIN_VALUE); + verifyWatermark(harness.getOutput().poll(), 2L); + + // still the 2 keys + // one element in PQ for 42 (the barfoo) as it arrived early + // for 43 the element entered the NFA and the PQ is empty + + assertEquals(2L, harness.numKeysForWatermarkCallback()); + assertTrue(operator.hasNonEmptyNFA(42)); + assertEquals(1L, operator.getPQSize(42)); + assertTrue(operator.hasNonEmptyNFA(43)); + assertTrue(!operator.hasNonEmptyPQ(43)); + + harness.processElement(new StreamRecord<>(startEvent2, 4L)); + harness.processElement(new StreamRecord<Event>(middleEvent2, 5L)); + harness.processElement(new StreamRecord<>(endEvent1, 6L)); + harness.processWatermark(11L); + harness.processWatermark(12L); + + // now we have 1 key because the 43 expired and was removed. + // 42 is still there due to startEvent2 + assertEquals(1L, harness.numKeysForWatermarkCallback()); + assertTrue(operator.hasNonEmptyNFA(42)); + assertTrue(!operator.hasNonEmptyPQ(42)); + assertTrue(!operator.hasNonEmptyNFA(43)); + assertTrue(!operator.hasNonEmptyPQ(43)); + + verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent1, endEvent1); + verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent2, endEvent1); + verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent1); + verifyWatermark(harness.getOutput().poll(), 11L); + verifyWatermark(harness.getOutput().poll(), 12L); + + harness.processElement(new StreamRecord<Event>(middleEvent3, 12L)); + harness.processElement(new StreamRecord<>(endEvent2, 13L)); + harness.processWatermark(20L); + harness.processWatermark(21L); + + assertTrue(!operator.hasNonEmptyNFA(42)); + assertTrue(!operator.hasNonEmptyPQ(42)); + assertEquals(0L, harness.numKeysForWatermarkCallback()); + + verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent2); + verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent3, endEvent2); + verifyWatermark(harness.getOutput().poll(), 20L); + verifyWatermark(harness.getOutput().poll(), 21L); + + harness.close(); + } + + @Test + public void testCEPOperatorCleanupProcessingTime() throws Exception { + + Event startEvent1 = new Event(42, "start", 1.0); + Event startEvent2 = new Event(42, "start", 2.0); + SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); + SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0); + SubEvent middleEvent3 = new SubEvent(42, "foo3", 1.0, 10.0); + Event endEvent1 = new Event(42, "end", 1.0); + Event endEvent2 = new Event(42, "end", 2.0); + + Event startEventK2 = new Event(43, "start", 1.0); + + TestKeySelector keySelector = new TestKeySelector(); + KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOpearator(true, keySelector); + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = getCepTestHarness(operator); + + harness.open(); + + harness.setProcessingTime(0L); + + harness.processElement(new StreamRecord<>(startEvent1, 1L)); + harness.processElement(new StreamRecord<>(startEventK2, 1L)); + harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L)); + harness.processElement(new StreamRecord<Event>(middleEvent1, 2L)); + harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3L)); + + assertTrue(!operator.hasNonEmptyPQ(42)); + assertTrue(!operator.hasNonEmptyPQ(43)); + assertTrue(operator.hasNonEmptyNFA(42)); + assertTrue(operator.hasNonEmptyNFA(43)); + + harness.setProcessingTime(3L); + + harness.processElement(new StreamRecord<>(startEvent2, 3L)); + harness.processElement(new StreamRecord<Event>(middleEvent2, 4L)); + harness.processElement(new StreamRecord<>(endEvent1, 5L)); + + verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent1, endEvent1); + verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent2, endEvent1); + verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent1); + + harness.setProcessingTime(11L); + + harness.processElement(new StreamRecord<Event>(middleEvent3, 11L)); + harness.processElement(new StreamRecord<>(endEvent2, 12L)); + + verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent2); + verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent3, endEvent2); + + harness.setProcessingTime(21L); + + assertTrue(operator.hasNonEmptyNFA(42)); + + harness.processElement(new StreamRecord<>(startEvent1, 21L)); + assertTrue(operator.hasNonEmptyNFA(42)); + + harness.setProcessingTime(49L); + + // TODO: 3/13/17 we have to have another event in order to clean up + harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L)); + + // the pattern expired + assertTrue(!operator.hasNonEmptyNFA(42)); + + assertEquals(0L, harness.numKeysForWatermarkCallback()); + assertTrue(!operator.hasNonEmptyPQ(42)); + assertTrue(!operator.hasNonEmptyPQ(43)); + + harness.close(); + } + + private void verifyWatermark(Object outputObject, long timestamp) { + assertTrue(outputObject instanceof Watermark); + assertEquals(timestamp, ((Watermark) outputObject).getTimestamp()); + } + + private void verifyPattern(Object outputObject, Event start, SubEvent middle, Event end) { + assertTrue(outputObject instanceof StreamRecord); + + StreamRecord<?> resultRecord = (StreamRecord<?>) outputObject; + assertTrue(resultRecord.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue(); + assertEquals(start, patternMap.get("start")); + assertEquals(middle, patternMap.get("middle")); + assertEquals(end, patternMap.get("end")); + } + + private OneInputStreamOperatorTestHarness<Event, Map<String, Event>> getCepTestHarness(boolean isProcessingTime) throws Exception { + KeySelector<Event, Integer> keySelector = new TestKeySelector(); + + return new KeyedOneInputStreamOperatorTestHarness<>( + getKeyedCepOpearator(isProcessingTime, keySelector), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + } + + private OneInputStreamOperatorTestHarness<Event, Map<String, Event>> getCepTestHarness( + KeyedCEPPatternOperator<Event, Integer> cepOperator) throws Exception { + KeySelector<Event, Integer> keySelector = new TestKeySelector(); + + return new KeyedOneInputStreamOperatorTestHarness<>( + cepOperator, + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + } + + private KeyedCEPPatternOperator<Event, Integer> getKeyedCepOpearator( + boolean isProcessingTime, + KeySelector<Event, Integer> keySelector) { + + return new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + isProcessingTime, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory()); + } + + private static class TestKeySelector implements KeySelector<Event, Integer> { + + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + } + private static class NFAFactory implements NFACompiler.NFAFactory<Event> { private static final long serialVersionUID = 1173020762472766713L;