[FLINK-5969] Add CEPFrom12MigrationTest

The binary snapshots have been created on the Flink 1.2 branch.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/852a710b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/852a710b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/852a710b

Branch: refs/heads/release-1.2
Commit: 852a710b4d91bdd319238c87d227c51a904070a7
Parents: 53432e0
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Fri Apr 28 12:28:50 2017 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Wed May 3 13:50:05 2017 +0200

----------------------------------------------------------------------
 .../cep/operator/CEPFrom12MigrationTest.java    | 480 +++++++++++++++++++
 ...-migration-after-branching-flink1.2-snapshot | Bin 0 -> 5580 bytes
 ...-single-pattern-afterwards-flink1.2-snapshot | Bin 0 -> 2326 bytes
 ...ation-starting-new-pattern-flink1.2-snapshot | Bin 0 -> 5389 bytes
 4 files changed, 480 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/852a710b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
new file mode 100644
index 0000000..9a15754
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OperatorSnapshotUtil;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for checking whether CEP operator can restore from snapshots that 
were done
+ * using the Flink 1.2 operator.
+ *
+ * <p>For regenerating the binary snapshot file you have to run the {@code 
write*()} method on
+ * the Flink 1.2 branch.
+ */
+
+public class CEPFrom12MigrationTest {
+
+       /**
+        * Manually run this to write binary snapshot data.
+        */
+       @Ignore
+       @Test
+       public void writAfterBranchingPatternSnapshot() throws Exception {
+
+               KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
+                       private static final long serialVersionUID = 
-4873366487571254798L;
+
+                       @Override
+                       public Integer getKey(Event value) throws Exception {
+                               return value.getId();
+                       }
+               };
+
+               final Event startEvent = new Event(42, "start", 1.0);
+               final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 
10.0);
+               final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 
10.0);
+
+               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+                       new KeyedOneInputStreamOperatorTestHarness<>(
+                               new KeyedCEPPatternOperator<>(
+                                       Event.createTypeSerializer(),
+                                       false,
+                                       keySelector,
+                                       IntSerializer.INSTANCE,
+                                       new NFAFactory()),
+                               keySelector,
+                               BasicTypeInfo.INT_TYPE_INFO);
+
+               harness.setup();
+               harness.open();
+
+               harness.processElement(new StreamRecord<Event>(startEvent, 1));
+               harness.processElement(new StreamRecord<Event>(new Event(42, 
"foobar", 1.0), 2));
+               harness.processElement(new StreamRecord<Event>(new SubEvent(42, 
"barfoo", 1.0, 5.0), 3));
+               harness.processElement(new StreamRecord<Event>(middleEvent1, 
2));
+               harness.processElement(new StreamRecord<Event>(middleEvent2, 
3));
+
+               harness.processWatermark(new Watermark(5));
+
+               // do snapshot and save to file
+               OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+               OperatorSnapshotUtil.writeStateHandle(snapshot, 
"src/test/resources/cep-migration-after-branching-flink1.2-snapshot");
+
+               harness.close();
+       }
+
+       @Test
+       public void testRestoreAfterBranchingPattern() throws Exception {
+
+               KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
+                       private static final long serialVersionUID = 
-4873366487571254798L;
+
+                       @Override
+                       public Integer getKey(Event value) throws Exception {
+                               return value.getId();
+                       }
+               };
+
+               final Event startEvent = new Event(42, "start", 1.0);
+               final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 
10.0);
+               final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 
10.0);
+               final Event endEvent = new Event(42, "end", 1.0);
+
+               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+                               new KeyedOneInputStreamOperatorTestHarness<>(
+                                               new KeyedCEPPatternOperator<>(
+                                                               
Event.createTypeSerializer(),
+                                                               false,
+                                                               keySelector,
+                                                               
IntSerializer.INSTANCE,
+                                                               new 
NFAFactory()),
+                                               keySelector,
+                                               BasicTypeInfo.INT_TYPE_INFO);
+
+               harness.setup();
+               harness.initializeState(
+                               OperatorSnapshotUtil.readStateHandle(
+                                               
OperatorSnapshotUtil.getResourceFilename("cep-migration-after-branching-flink1.2-snapshot")));
+               harness.open();
+
+               harness.processElement(new StreamRecord<>(new Event(42, 
"start", 1.0), 4));
+               harness.processElement(new StreamRecord<>(endEvent, 5));
+
+               harness.processWatermark(new Watermark(20));
+
+               ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+               // watermark and 2 results
+               assertEquals(3, result.size());
+
+               Object resultObject1 = result.poll();
+               assertTrue(resultObject1 instanceof StreamRecord);
+               StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+               assertTrue(resultRecord1.getValue() instanceof Map);
+
+               Object resultObject2 = result.poll();
+               assertTrue(resultObject2 instanceof StreamRecord);
+               StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
+               assertTrue(resultRecord2.getValue() instanceof Map);
+
+               @SuppressWarnings("unchecked")
+               Map<String, Event> patternMap1 = (Map<String, Event>) 
resultRecord1.getValue();
+
+               assertEquals(startEvent, patternMap1.get("start"));
+               assertEquals(middleEvent1, patternMap1.get("middle"));
+               assertEquals(endEvent, patternMap1.get("end"));
+
+               @SuppressWarnings("unchecked")
+               Map<String, Event> patternMap2 = (Map<String, Event>) 
resultRecord2.getValue();
+
+               assertEquals(startEvent, patternMap2.get("start"));
+               assertEquals(middleEvent2, patternMap2.get("middle"));
+               assertEquals(endEvent, patternMap2.get("end"));
+
+               harness.close();
+       }
+
+       /**
+        * Manually run this to write binary snapshot data.
+        */
+       @Ignore
+       @Test
+       public void writeStartingNewPatternAfterMigrationSnapshot() throws 
Exception {
+
+               KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
+                       private static final long serialVersionUID = 
-4873366487571254798L;
+
+                       @Override
+                       public Integer getKey(Event value) throws Exception {
+                               return value.getId();
+                       }
+               };
+
+               final Event startEvent1 = new Event(42, "start", 1.0);
+               final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 
10.0);
+
+               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+                       new KeyedOneInputStreamOperatorTestHarness<>(
+                               new KeyedCEPPatternOperator<>(
+                                       Event.createTypeSerializer(),
+                                       false,
+                                       keySelector,
+                                       IntSerializer.INSTANCE,
+                                       new NFAFactory()),
+                               keySelector,
+                               BasicTypeInfo.INT_TYPE_INFO);
+
+               harness.setup();
+               harness.open();
+               harness.processElement(new StreamRecord<Event>(startEvent1, 1));
+               harness.processElement(new StreamRecord<Event>(new Event(42, 
"foobar", 1.0), 2));
+               harness.processElement(new StreamRecord<Event>(new SubEvent(42, 
"barfoo", 1.0, 5.0), 3));
+               harness.processElement(new StreamRecord<Event>(middleEvent1, 
2));
+               harness.processWatermark(new Watermark(5));
+
+               // do snapshot and save to file
+               OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+               OperatorSnapshotUtil.writeStateHandle(snapshot, 
"src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot");
+
+               harness.close();
+       }
+
+       @Test
+       public void testRestoreStartingNewPatternAfterMigration() throws 
Exception {
+
+               KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
+                       private static final long serialVersionUID = 
-4873366487571254798L;
+
+                       @Override
+                       public Integer getKey(Event value) throws Exception {
+                               return value.getId();
+                       }
+               };
+
+               final Event startEvent1 = new Event(42, "start", 1.0);
+               final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 
10.0);
+               final Event startEvent2 = new Event(42, "start", 5.0);
+               final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 
10.0);
+               final Event endEvent = new Event(42, "end", 1.0);
+
+               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+                               new KeyedOneInputStreamOperatorTestHarness<>(
+                                               new KeyedCEPPatternOperator<>(
+                                                               
Event.createTypeSerializer(),
+                                                               false,
+                                                               keySelector,
+                                                               
IntSerializer.INSTANCE,
+                                                               new 
NFAFactory()),
+                                               keySelector,
+                                               BasicTypeInfo.INT_TYPE_INFO);
+
+               harness.setup();
+               harness.initializeState(
+                               OperatorSnapshotUtil.readStateHandle(
+                                               
OperatorSnapshotUtil.getResourceFilename("cep-migration-starting-new-pattern-flink1.2-snapshot")));
+               harness.open();
+
+               harness.processElement(new StreamRecord<>(startEvent2, 5));
+               harness.processElement(new StreamRecord<Event>(middleEvent2, 
6));
+               harness.processElement(new StreamRecord<>(endEvent, 7));
+
+               harness.processWatermark(new Watermark(20));
+
+               ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+               // watermark and 3 results
+               assertEquals(4, result.size());
+
+               Object resultObject1 = result.poll();
+               assertTrue(resultObject1 instanceof StreamRecord);
+               StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+               assertTrue(resultRecord1.getValue() instanceof Map);
+
+               Object resultObject2 = result.poll();
+               assertTrue(resultObject2 instanceof StreamRecord);
+               StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
+               assertTrue(resultRecord2.getValue() instanceof Map);
+
+               Object resultObject3 = result.poll();
+               assertTrue(resultObject3 instanceof StreamRecord);
+               StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3;
+               assertTrue(resultRecord3.getValue() instanceof Map);
+
+               @SuppressWarnings("unchecked")
+               Map<String, Event> patternMap1 = (Map<String, Event>) 
resultRecord1.getValue();
+
+               assertEquals(startEvent1, patternMap1.get("start"));
+               assertEquals(middleEvent1, patternMap1.get("middle"));
+               assertEquals(endEvent, patternMap1.get("end"));
+
+               @SuppressWarnings("unchecked")
+               Map<String, Event> patternMap2 = (Map<String, Event>) 
resultRecord2.getValue();
+
+               assertEquals(startEvent1, patternMap2.get("start"));
+               assertEquals(middleEvent2, patternMap2.get("middle"));
+               assertEquals(endEvent, patternMap2.get("end"));
+
+               @SuppressWarnings("unchecked")
+               Map<String, Event> patternMap3 = (Map<String, Event>) 
resultRecord3.getValue();
+
+               assertEquals(startEvent2, patternMap3.get("start"));
+               assertEquals(middleEvent2, patternMap3.get("middle"));
+               assertEquals(endEvent, patternMap3.get("end"));
+
+               harness.close();
+       }
+
+       /**
+        * Manually run this to write binary snapshot data.
+        */
+       @Ignore
+       @Test
+       public void writeSinglePatternAfterMigrationSnapshot() throws Exception 
{
+
+               KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
+                       private static final long serialVersionUID = 
-4873366487571254798L;
+
+                       @Override
+                       public Integer getKey(Event value) throws Exception {
+                               return value.getId();
+                       }
+               };
+
+               final Event startEvent1 = new Event(42, "start", 1.0);
+
+               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+                       new KeyedOneInputStreamOperatorTestHarness<>(
+                               new KeyedCEPPatternOperator<>(
+                                       Event.createTypeSerializer(),
+                                       false,
+                                       keySelector,
+                                       IntSerializer.INSTANCE,
+                                       new SinglePatternNFAFactory()),
+                               keySelector,
+                               BasicTypeInfo.INT_TYPE_INFO);
+
+               harness.setup();
+               harness.open();
+               harness.processWatermark(new Watermark(5));
+
+               // do snapshot and save to file
+               OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+               OperatorSnapshotUtil.writeStateHandle(snapshot, 
"src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot");
+
+               harness.close();
+       }
+
+
+       @Test
+       public void testSinglePatternAfterMigration() throws Exception {
+
+               KeySelector<Event, Integer> keySelector = new 
KeySelector<Event, Integer>() {
+                       private static final long serialVersionUID = 
-4873366487571254798L;
+
+                       @Override
+                       public Integer getKey(Event value) throws Exception {
+                               return value.getId();
+                       }
+               };
+
+               final Event startEvent1 = new Event(42, "start", 1.0);
+
+               OneInputStreamOperatorTestHarness<Event, Map<String, Event>> 
harness =
+                               new KeyedOneInputStreamOperatorTestHarness<>(
+                                               new KeyedCEPPatternOperator<>(
+                                                               
Event.createTypeSerializer(),
+                                                               false,
+                                                               keySelector,
+                                                               
IntSerializer.INSTANCE,
+                                                               new 
SinglePatternNFAFactory()),
+                                               keySelector,
+                                               BasicTypeInfo.INT_TYPE_INFO);
+
+               harness.setup();
+               harness.initializeState(
+                               OperatorSnapshotUtil.readStateHandle(
+                                               
OperatorSnapshotUtil.getResourceFilename("cep-migration-single-pattern-afterwards-flink1.2-snapshot")));
+               harness.open();
+
+               harness.processElement(new StreamRecord<>(startEvent1, 5));
+
+               harness.processWatermark(new Watermark(20));
+
+               ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+               // watermark and the result
+               assertEquals(2, result.size());
+
+               Object resultObject = result.poll();
+               assertTrue(resultObject instanceof StreamRecord);
+               StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+               assertTrue(resultRecord.getValue() instanceof Map);
+
+               @SuppressWarnings("unchecked")
+               Map<String, Event> patternMap = (Map<String, Event>) 
resultRecord.getValue();
+
+               assertEquals(startEvent1, patternMap.get("start"));
+
+               harness.close();
+       }
+
+       private static class SinglePatternNFAFactory implements 
NFACompiler.NFAFactory<Event> {
+
+               private static final long serialVersionUID = 
1173020762472766713L;
+
+               private final boolean handleTimeout;
+
+               private SinglePatternNFAFactory() {
+                       this(false);
+               }
+
+               private SinglePatternNFAFactory(boolean handleTimeout) {
+                       this.handleTimeout = handleTimeout;
+               }
+
+               @Override
+               public NFA<Event> createNFA() {
+
+                       Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new StartFilter())
+                                       .within(Time.milliseconds(10L));
+
+                       return NFACompiler.compile(pattern, 
Event.createTypeSerializer(), handleTimeout);
+               }
+       }
+
+       private static class NFAFactory implements 
NFACompiler.NFAFactory<Event> {
+
+               private static final long serialVersionUID = 
1173020762472766713L;
+
+               private final boolean handleTimeout;
+
+               private NFAFactory() {
+                       this(false);
+               }
+
+               private NFAFactory(boolean handleTimeout) {
+                       this.handleTimeout = handleTimeout;
+               }
+
+               @Override
+               public NFA<Event> createNFA() {
+
+                       Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new StartFilter())
+                                       .followedBy("middle")
+                                       .subtype(SubEvent.class)
+                                       .where(new MiddleFilter())
+                                       .followedBy("end")
+                                       .where(new EndFilter())
+                                       // add a window timeout to test whether 
timestamps of elements in the
+                                       // priority queue in CEP operator are 
correctly checkpointed/restored
+                                       .within(Time.milliseconds(10L));
+
+                       return NFACompiler.compile(pattern, 
Event.createTypeSerializer(), handleTimeout);
+               }
+       }
+
+       private static class StartFilter implements FilterFunction<Event> {
+               private static final long serialVersionUID = 
5726188262756267490L;
+
+               @Override
+               public boolean filter(Event value) throws Exception {
+                       return value.getName().equals("start");
+               }
+       }
+
+       private static class MiddleFilter implements FilterFunction<SubEvent> {
+               private static final long serialVersionUID = 
6215754202506583964L;
+
+               @Override
+               public boolean filter(SubEvent value) throws Exception {
+                       return value.getVolume() > 5.0;
+               }
+       }
+
+       private static class EndFilter implements FilterFunction<Event> {
+               private static final long serialVersionUID = 
7056763917392056548L;
+
+               @Override
+               public boolean filter(Event value) throws Exception {
+                       return value.getName().equals("end");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/852a710b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.2-snapshot
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.2-snapshot
 
b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.2-snapshot
new file mode 100644
index 0000000..6775f2a
Binary files /dev/null and 
b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.2-snapshot
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/852a710b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot
 
b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot
new file mode 100644
index 0000000..f63b7dd
Binary files /dev/null and 
b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/852a710b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot
 
b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot
new file mode 100644
index 0000000..8e0fd27
Binary files /dev/null and 
b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot
 differ

Reply via email to