Repository: flink
Updated Branches:
  refs/heads/master 6f570e7b6 -> 546e2ad73


[FLINK-6656] [cep] Change element PriorityQueue to MapState.

This is to leverage the fact that RocksDB already returns the
keys sorted. So now elements, instead of being stores in a PQ
and all of them being deserialized and serialized at each incoming
element, the are stored in a MapState with the key being the
timestamp and the value, a List of elements that refer to the
same timestamp.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/546e2ad7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/546e2ad7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/546e2ad7

Branch: refs/heads/master
Commit: 546e2ad73165b00d93c3c460372b9d49a4b5d8b7
Parents: 6f570e7
Author: kkloudas <[email protected]>
Authored: Mon May 22 11:43:42 2017 +0200
Committer: kkloudas <[email protected]>
Committed: Tue May 23 10:48:34 2017 +0200

----------------------------------------------------------------------
 .../AbstractKeyedCEPPatternOperator.java        | 152 +++++++++++++------
 .../cep/operator/CEPMigration11to13Test.java    |   7 +-
 .../flink/cep/operator/CEPOperatorTest.java     |  19 +--
 .../src/test/resources/cep-keyed-1_1-snapshot   | Bin 5612 -> 5674 bytes
 .../test/resources/cep-non-keyed-1.1-snapshot   | Bin 3274 -> 3336 bytes
 5 files changed, 118 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/546e2ad7/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 7b6e5e3..af4b53e 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -19,6 +19,8 @@
 package org.apache.flink.cep.operator;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
@@ -29,6 +31,8 @@ import 
org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import 
org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -54,6 +58,10 @@ import org.apache.flink.util.Preconditions;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.PriorityQueue;
 
@@ -86,12 +94,11 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
        ///////////////                 State                   //////////////
 
        private static final String NFA_OPERATOR_STATE_NAME = 
"nfaOperatorStateName";
-       private static final String PRIORITY_QUEUE_STATE_NAME = 
"priorityQueueStateName";
+       private static final String EVENT_QUEUE_STATE_NAME = 
"eventQueuesStateName";
 
        private transient ValueState<NFA<IN>> nfaOperatorState;
-       private transient ValueState<PriorityQueue<StreamRecord<IN>>> 
priorityQueueOperatorState;
+       private transient MapState<Long, List<IN>> elementQueueState;
 
-       private final PriorityQueueFactory<StreamRecord<IN>> 
priorityQueueFactory = new PriorityQueueStreamRecordFactory<>();
        private final NFACompiler.NFAFactory<IN> nfaFactory;
 
        private transient InternalTimerService<VoidNamespace> timerService;
@@ -134,19 +141,13 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                                                new 
NFA.NFASerializer<>(inputSerializer)));
                }
 
-               @SuppressWarnings("unchecked,rawtypes")
-               TypeSerializer<StreamRecord<IN>> streamRecordSerializer =
-                       (TypeSerializer) new 
StreamElementSerializer<>(inputSerializer);
-
-               if (priorityQueueOperatorState == null) {
-                       priorityQueueOperatorState = 
getRuntimeContext().getState(
-                               new ValueStateDescriptor<>(
-                                       PRIORITY_QUEUE_STATE_NAME,
-                                       new PriorityQueueSerializer<>(
-                                               streamRecordSerializer,
-                                               new 
PriorityQueueStreamRecordFactory<IN>()
+               if (elementQueueState == null) {
+                       elementQueueState = getRuntimeContext().getMapState(
+                                       new MapStateDescriptor<>(
+                                                       EVENT_QUEUE_STATE_NAME,
+                                                       LongSerializer.INSTANCE,
+                                                       new 
ListSerializer<>(inputSerializer)
                                        )
-                               )
                        );
                }
        }
@@ -171,25 +172,32 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
 
                } else {
 
+                       long timestamp = element.getTimestamp();
+                       IN value = element.getValue();
+
                        // In event-time processing we assume correctness of 
the watermark.
                        // Events with timestamp smaller than the last seen 
watermark are considered late.
                        // Late events are put in a dedicated side output, if 
the user has specified one.
 
-                       if (element.getTimestamp() >= lastWatermark) {
+                       if (timestamp >= lastWatermark) {
 
                                // we have an event with a valid timestamp, so
                                // we buffer it until we receive the proper 
watermark.
 
                                saveRegisterWatermarkTimer();
 
-                               PriorityQueue<StreamRecord<IN>> priorityQueue = 
getPriorityQueue();
+                               List<IN> elementsForTimestamp =  
elementQueueState.get(timestamp);
+                               if (elementsForTimestamp == null) {
+                                       elementsForTimestamp = new 
ArrayList<>();
+                               }
+
                                if 
(getExecutionConfig().isObjectReuseEnabled()) {
                                        // copy the StreamRecord so that it 
cannot be changed
-                                       priorityQueue.offer(new 
StreamRecord<>(inputSerializer.copy(element.getValue()), 
element.getTimestamp()));
+                                       
elementsForTimestamp.add(inputSerializer.copy(value));
                                } else {
-                                       priorityQueue.offer(element);
+                                       
elementsForTimestamp.add(element.getValue());
                                }
-                               updatePriorityQueue(priorityQueue);
+                               elementQueueState.put(timestamp, 
elementsForTimestamp);
                        }
                }
        }
@@ -218,23 +226,28 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                // 5) update the last seen watermark.
 
                // STEP 1
-               PriorityQueue<StreamRecord<IN>> priorityQueue = 
getPriorityQueue();
+               PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
                NFA<IN> nfa = getNFA();
 
                // STEP 2
-               while (!priorityQueue.isEmpty() && 
priorityQueue.peek().getTimestamp() <= timerService.currentWatermark()) {
-                       StreamRecord<IN> streamRecord = priorityQueue.poll();
-                       processEvent(nfa, streamRecord.getValue(), 
streamRecord.getTimestamp());
+               while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() 
<= timerService.currentWatermark()) {
+                       long timestamp = sortedTimestamps.poll();
+                       for (IN element: elementQueueState.get(timestamp)) {
+                               processEvent(nfa, element, timestamp);
+                       }
+                       elementQueueState.remove(timestamp);
                }
 
                // STEP 3
                advanceTime(nfa, timerService.currentWatermark());
 
                // STEP 4
-               updatePriorityQueue(priorityQueue);
+               if (sortedTimestamps.isEmpty()) {
+                       elementQueueState.clear();
+               }
                updateNFA(nfa);
 
-               if (!priorityQueue.isEmpty() || !nfa.isEmpty()) {
+               if (!sortedTimestamps.isEmpty() || !nfa.isEmpty()) {
                        saveRegisterWatermarkTimer();
                }
 
@@ -264,17 +277,12 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                }
        }
 
-       private PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws 
IOException {
-               PriorityQueue<StreamRecord<IN>> priorityQueue = 
priorityQueueOperatorState.value();
-               return priorityQueue != null ? priorityQueue : 
priorityQueueFactory.createPriorityQueue();
-       }
-
-       private void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) 
throws IOException {
-               if (queue.isEmpty()) {
-                       priorityQueueOperatorState.clear();
-               } else {
-                       priorityQueueOperatorState.update(queue);
+       private PriorityQueue<Long> getSortedTimestamps() throws Exception {
+               PriorityQueue<Long> sortedTimestamps = new PriorityQueue<>();
+               for (Long timestamp: elementQueueState.keys()) {
+                       sortedTimestamps.offer(timestamp);
                }
+               return sortedTimestamps;
        }
 
        /**
@@ -318,6 +326,18 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                ValueState<NFA<IN>> oldNfaOperatorState = 
getRuntimeContext().getState(
                                new ValueStateDescriptor<>("nfaOperatorState", 
new NFA.Serializer<IN>()));
 
+               ValueState<PriorityQueue<StreamRecord<IN>>> 
oldPriorityQueueOperatorState =
+                               getRuntimeContext().getState(
+                                       new ValueStateDescriptor<>(
+                                                       
"priorityQueueStateName",
+                                                       new 
PriorityQueueSerializer<>(
+                                                                       
((TypeSerializer) new StreamElementSerializer<>(inputSerializer)),
+                                                                       new 
PriorityQueueStreamRecordFactory<IN>()
+                                                       )
+                                       )
+                       );
+
+
                if (migratingFromOldKeyedOperator) {
                        int numberEntries = inputView.readInt();
                        for (int i = 0; i < numberEntries; i++) {
@@ -328,6 +348,30 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                                NFA<IN> nfa = oldNfaOperatorState.value();
                                oldNfaOperatorState.clear();
                                nfaOperatorState.update(nfa);
+
+                               PriorityQueue<StreamRecord<IN>> priorityQueue = 
oldPriorityQueueOperatorState.value();
+                               if (priorityQueue != null && 
!priorityQueue.isEmpty()) {
+                                       Map<Long, List<IN>> elementMap = new 
HashMap<>();
+                                       for (StreamRecord<IN> record: 
priorityQueue) {
+                                               long timestamp = 
record.getTimestamp();
+                                               IN element = record.getValue();
+
+                                               List<IN> elements = 
elementMap.get(timestamp);
+                                               if (elements == null) {
+                                                       elements = new 
ArrayList<>();
+                                                       
elementMap.put(timestamp, elements);
+                                               }
+                                               elements.add(element);
+                                       }
+
+                                       // write the old state into the new one.
+                                       for (Map.Entry<Long, List<IN>> entry: 
elementMap.entrySet()) {
+                                               
elementQueueState.put(entry.getKey(), entry.getValue());
+                                       }
+
+                                       // clear the old state
+                                       oldPriorityQueueOperatorState.clear();
+                               }
                        }
                } else {
 
@@ -339,22 +383,35 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
 
                        // retrieve the elements that were pending in the 
priority queue
                        MultiplexingStreamRecordSerializer<IN> recordSerializer 
= new MultiplexingStreamRecordSerializer<>(inputSerializer);
-                       PriorityQueue<StreamRecord<IN>> priorityQueue = 
priorityQueueFactory.createPriorityQueue();
+
+                       Map<Long, List<IN>> elementMap = new HashMap<>();
                        int entries = ois.readInt();
                        for (int i = 0; i < entries; i++) {
                                StreamElement streamElement = 
recordSerializer.deserialize(inputView);
-                               
priorityQueue.offer(streamElement.<IN>asRecord());
+                               StreamRecord<IN> record = 
streamElement.<IN>asRecord();
+
+                               long timestamp = record.getTimestamp();
+                               IN element = record.getValue();
+
+                               List<IN> elements = elementMap.get(timestamp);
+                               if (elements == null) {
+                                       elements = new ArrayList<>();
+                                       elementMap.put(timestamp, elements);
+                               }
+                               elements.add(element);
                        }
 
                        // finally register the retrieved state with the new 
keyed state.
                        setCurrentKey((byte) 0);
                        nfaOperatorState.update(nfa);
-                       priorityQueueOperatorState.update(priorityQueue);
+
+                       // write the priority queue to the new map state.
+                       for (Map.Entry<Long, List<IN>> entry: 
elementMap.entrySet()) {
+                               elementQueueState.put(entry.getKey(), 
entry.getValue());
+                       }
 
                        if (!isProcessingTime) {
                                // this is relevant only for event/ingestion 
time
-
-                               // need to work around type restrictions
                                setCurrentKey((byte) 0);
                                saveRegisterWatermarkTimer();
                        }
@@ -546,15 +603,18 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
        }
 
        @VisibleForTesting
-       public boolean hasNonEmptyPQ(KEY key) throws IOException {
+       public boolean hasNonEmptyPQ(KEY key) throws Exception {
                setCurrentKey(key);
-               return priorityQueueOperatorState.value() != null;
+               return elementQueueState.keys().iterator().hasNext();
        }
 
        @VisibleForTesting
-       public int getPQSize(KEY key) throws IOException {
+       public int getPQSize(KEY key) throws Exception {
                setCurrentKey(key);
-               PriorityQueue<StreamRecord<IN>> pq = getPriorityQueue();
-               return pq == null ? -1 : pq.size();
+               int counter = 0;
+               for (List<IN> elements: elementQueueState.values()) {
+                       counter += elements.size();
+               }
+               return counter;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/546e2ad7/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 255b8c2..d575e43 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -87,6 +87,9 @@ public class CEPMigration11to13Test {
                harness.processElement(new StreamRecord<Event>(new Event(42, 
"foobar", 1.0), 2));
                harness.processElement(new StreamRecord<Event>(new SubEvent(42, 
"barfoo", 1.0, 5.0), 3));
                harness.processWatermark(new Watermark(2));
+
+               harness.processElement(new StreamRecord<Event>(middleEvent, 3));
+
                // simulate snapshot/restore with empty element queue but NFA 
state
                StreamTaskState snapshot = harness.snapshot(1, 1);
                FileOutputStream out = new FileOutputStream(
@@ -112,7 +115,6 @@ public class CEPMigration11to13Test {
                
harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-1_1-snapshot"));
                harness.open();
 
-               harness.processElement(new StreamRecord<Event>(middleEvent, 3));
                harness.processElement(new StreamRecord<>(new Event(42, 
"start", 1.0), 4));
                harness.processElement(new StreamRecord<>(endEvent, 5));
 
@@ -206,6 +208,8 @@ public class CEPMigration11to13Test {
                harness.processElement(new StreamRecord<Event>(new SubEvent(42, 
"barfoo", 1.0, 5.0), 3));
                harness.processWatermark(new Watermark(2));
 
+               harness.processElement(new StreamRecord<Event>(middleEvent, 3));
+
                // simulate snapshot/restore with empty element queue but NFA 
state
                StreamTaskState snapshot = harness.snapshot(1, 1);
                FileOutputStream out = new FileOutputStream(
@@ -233,7 +237,6 @@ public class CEPMigration11to13Test {
                
harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-non-keyed-1.1-snapshot"));
                harness.open();
 
-               harness.processElement(new StreamRecord<Event>(middleEvent, 3));
                harness.processElement(new StreamRecord<>(new Event(42, 
"start", 1.0), 4));
                harness.processElement(new StreamRecord<>(endEvent, 5));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/546e2ad7/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 95e3a37..ab63479 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -43,7 +43,6 @@ import 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -140,8 +139,6 @@ public class CEPOperatorTest extends TestLogger {
        }
 
        @Test
-       @Ignore
-       // TODO: 5/19/17 Re-instate when checkpoints are fixed
        public void testKeyedCEPOperatorCheckpointingWithRocksDB() throws 
Exception {
 
                String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
@@ -306,11 +303,11 @@ public class CEPOperatorTest extends TestLogger {
 
                harness.processWatermark(new Watermark(Long.MIN_VALUE));
 
-               harness.processElement(new StreamRecord<>(startEvent1, 1L));
-               harness.processElement(new StreamRecord<>(startEventK2, 1L));
                harness.processElement(new StreamRecord<>(new Event(42, 
"foobar", 1.0), 2L));
                harness.processElement(new StreamRecord<Event>(middleEvent1, 
2L));
                harness.processElement(new StreamRecord<Event>(new SubEvent(42, 
"barfoo", 1.0, 5.0), 3L));
+               harness.processElement(new StreamRecord<>(startEvent1, 1L));
+               harness.processElement(new StreamRecord<>(startEventK2, 1L));
 
                // there must be 2 keys 42, 43 registered for the watermark 
callback
                // all the seen elements must be in the priority queues but no 
NFA yet.
@@ -404,13 +401,13 @@ public class CEPOperatorTest extends TestLogger {
 
                harness.processWatermark(new Watermark(Long.MIN_VALUE));
 
+               harness.processElement(new StreamRecord<>(middle2Event1, 6));
+               harness.processElement(new StreamRecord<>(middle1Event3, 7));
                harness.processElement(new StreamRecord<>(startEvent, 1));
                harness.processElement(new StreamRecord<>(middle1Event1, 3));
-               harness.processElement(new StreamRecord<>(middle1Event1, 3)); 
// this and the following get reordered
                harness.processElement(new StreamRecord<>(middle1Event2, 3));
+               harness.processElement(new StreamRecord<>(middle1Event1, 3));
                harness.processElement(new StreamRecord<>(new Event(41, "d", 
6.0), 5));
-               harness.processElement(new StreamRecord<>(middle2Event1, 6));
-               harness.processElement(new StreamRecord<>(middle1Event3, 7));
 
                assertEquals(1L, harness.numEventTimeTimers());
                assertEquals(7L, operator.getPQSize(41));
@@ -554,8 +551,6 @@ public class CEPOperatorTest extends TestLogger {
        }
 
        @Test
-       @Ignore
-       // TODO: 5/19/17 Re-instate when checkpoints are fixed
        public void testCEPOperatorSerializationWRocksDB() throws Exception {
                String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
                RocksDBStateBackend rocksDBStateBackend = new 
RocksDBStateBackend(new MemoryStateBackend());
@@ -626,13 +621,13 @@ public class CEPOperatorTest extends TestLogger {
                harness.processElement(new StreamRecord<>(startEvent1, 1));
                harness.processElement(new StreamRecord<Event>(middleEvent1, 
2));
                harness.processWatermark(2L);
+               harness.processElement(new StreamRecord<Event>(middleEvent3, 
5));
                harness.processElement(new StreamRecord<Event>(middleEvent2, 
3));
                harness.processElement(new StreamRecord<>(startEvent2, 4));
-               harness.processElement(new StreamRecord<Event>(middleEvent3, 
5));
                harness.processWatermark(5L);
-               harness.processElement(new StreamRecord<Event>(middleEvent4, 
5));
                harness.processElement(new StreamRecord<>(nextOne, 6));
                harness.processElement(new StreamRecord<>(endEvent, 8));
+               harness.processElement(new StreamRecord<Event>(middleEvent4, 
5));
                harness.processWatermark(100L);
 
                List<List<Event>> resultingPatterns = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/546e2ad7/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot 
b/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot
index 277de1d..75655c6 100644
Binary files 
a/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot and 
b/flink-libraries/flink-cep/src/test/resources/cep-keyed-1_1-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/546e2ad7/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot 
b/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot
index b5ca51e..68ca0ec 100644
Binary files 
a/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot and 
b/flink-libraries/flink-cep/src/test/resources/cep-non-keyed-1.1-snapshot differ

Reply via email to