[FLINK-5969] Add CEPFrom12MigrationTest The binary snapshots have been created on the Flink 1.2 branch.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/852a710b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/852a710b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/852a710b Branch: refs/heads/release-1.2 Commit: 852a710b4d91bdd319238c87d227c51a904070a7 Parents: 53432e0 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Fri Apr 28 12:28:50 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Wed May 3 13:50:05 2017 +0200 ---------------------------------------------------------------------- .../cep/operator/CEPFrom12MigrationTest.java | 480 +++++++++++++++++++ ...-migration-after-branching-flink1.2-snapshot | Bin 0 -> 5580 bytes ...-single-pattern-afterwards-flink1.2-snapshot | Bin 0 -> 2326 bytes ...ation-starting-new-pattern-flink1.2-snapshot | Bin 0 -> 5389 bytes 4 files changed, 480 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/852a710b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java new file mode 100644 index 0000000..9a15754 --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java @@ -0,0 +1,480 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.operator; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.cep.Event; +import org.apache.flink.cep.SubEvent; +import org.apache.flink.cep.nfa.NFA; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OperatorSnapshotUtil; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for checking whether CEP operator can restore from snapshots that were done + * using the Flink 1.2 operator. + * + * <p>For regenerating the binary snapshot file you have to run the {@code write*()} method on + * the Flink 1.2 branch. + */ + +public class CEPFrom12MigrationTest { + + /** + * Manually run this to write binary snapshot data. + */ + @Ignore + @Test + public void writAfterBranchingPatternSnapshot() throws Exception { + + KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + final Event startEvent = new Event(42, "start", 1.0); + final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); + final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0); + + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = + new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory()), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + harness.setup(); + harness.open(); + + harness.processElement(new StreamRecord<Event>(startEvent, 1)); + harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2)); + harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); + harness.processElement(new StreamRecord<Event>(middleEvent1, 2)); + harness.processElement(new StreamRecord<Event>(middleEvent2, 3)); + + harness.processWatermark(new Watermark(5)); + + // do snapshot and save to file + OperatorStateHandles snapshot = harness.snapshot(0L, 0L); + OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/cep-migration-after-branching-flink1.2-snapshot"); + + harness.close(); + } + + @Test + public void testRestoreAfterBranchingPattern() throws Exception { + + KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + final Event startEvent = new Event(42, "start", 1.0); + final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); + final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0); + final Event endEvent = new Event(42, "end", 1.0); + + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = + new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory()), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + harness.setup(); + harness.initializeState( + OperatorSnapshotUtil.readStateHandle( + OperatorSnapshotUtil.getResourceFilename("cep-migration-after-branching-flink1.2-snapshot"))); + harness.open(); + + harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4)); + harness.processElement(new StreamRecord<>(endEvent, 5)); + + harness.processWatermark(new Watermark(20)); + + ConcurrentLinkedQueue<Object> result = harness.getOutput(); + + // watermark and 2 results + assertEquals(3, result.size()); + + Object resultObject1 = result.poll(); + assertTrue(resultObject1 instanceof StreamRecord); + StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1; + assertTrue(resultRecord1.getValue() instanceof Map); + + Object resultObject2 = result.poll(); + assertTrue(resultObject2 instanceof StreamRecord); + StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2; + assertTrue(resultRecord2.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map<String, Event> patternMap1 = (Map<String, Event>) resultRecord1.getValue(); + + assertEquals(startEvent, patternMap1.get("start")); + assertEquals(middleEvent1, patternMap1.get("middle")); + assertEquals(endEvent, patternMap1.get("end")); + + @SuppressWarnings("unchecked") + Map<String, Event> patternMap2 = (Map<String, Event>) resultRecord2.getValue(); + + assertEquals(startEvent, patternMap2.get("start")); + assertEquals(middleEvent2, patternMap2.get("middle")); + assertEquals(endEvent, patternMap2.get("end")); + + harness.close(); + } + + /** + * Manually run this to write binary snapshot data. + */ + @Ignore + @Test + public void writeStartingNewPatternAfterMigrationSnapshot() throws Exception { + + KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + final Event startEvent1 = new Event(42, "start", 1.0); + final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); + + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = + new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory()), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + harness.setup(); + harness.open(); + harness.processElement(new StreamRecord<Event>(startEvent1, 1)); + harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2)); + harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); + harness.processElement(new StreamRecord<Event>(middleEvent1, 2)); + harness.processWatermark(new Watermark(5)); + + // do snapshot and save to file + OperatorStateHandles snapshot = harness.snapshot(0L, 0L); + OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot"); + + harness.close(); + } + + @Test + public void testRestoreStartingNewPatternAfterMigration() throws Exception { + + KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + final Event startEvent1 = new Event(42, "start", 1.0); + final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); + final Event startEvent2 = new Event(42, "start", 5.0); + final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0); + final Event endEvent = new Event(42, "end", 1.0); + + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = + new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory()), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + harness.setup(); + harness.initializeState( + OperatorSnapshotUtil.readStateHandle( + OperatorSnapshotUtil.getResourceFilename("cep-migration-starting-new-pattern-flink1.2-snapshot"))); + harness.open(); + + harness.processElement(new StreamRecord<>(startEvent2, 5)); + harness.processElement(new StreamRecord<Event>(middleEvent2, 6)); + harness.processElement(new StreamRecord<>(endEvent, 7)); + + harness.processWatermark(new Watermark(20)); + + ConcurrentLinkedQueue<Object> result = harness.getOutput(); + + // watermark and 3 results + assertEquals(4, result.size()); + + Object resultObject1 = result.poll(); + assertTrue(resultObject1 instanceof StreamRecord); + StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1; + assertTrue(resultRecord1.getValue() instanceof Map); + + Object resultObject2 = result.poll(); + assertTrue(resultObject2 instanceof StreamRecord); + StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2; + assertTrue(resultRecord2.getValue() instanceof Map); + + Object resultObject3 = result.poll(); + assertTrue(resultObject3 instanceof StreamRecord); + StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3; + assertTrue(resultRecord3.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map<String, Event> patternMap1 = (Map<String, Event>) resultRecord1.getValue(); + + assertEquals(startEvent1, patternMap1.get("start")); + assertEquals(middleEvent1, patternMap1.get("middle")); + assertEquals(endEvent, patternMap1.get("end")); + + @SuppressWarnings("unchecked") + Map<String, Event> patternMap2 = (Map<String, Event>) resultRecord2.getValue(); + + assertEquals(startEvent1, patternMap2.get("start")); + assertEquals(middleEvent2, patternMap2.get("middle")); + assertEquals(endEvent, patternMap2.get("end")); + + @SuppressWarnings("unchecked") + Map<String, Event> patternMap3 = (Map<String, Event>) resultRecord3.getValue(); + + assertEquals(startEvent2, patternMap3.get("start")); + assertEquals(middleEvent2, patternMap3.get("middle")); + assertEquals(endEvent, patternMap3.get("end")); + + harness.close(); + } + + /** + * Manually run this to write binary snapshot data. + */ + @Ignore + @Test + public void writeSinglePatternAfterMigrationSnapshot() throws Exception { + + KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + final Event startEvent1 = new Event(42, "start", 1.0); + + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = + new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new SinglePatternNFAFactory()), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + harness.setup(); + harness.open(); + harness.processWatermark(new Watermark(5)); + + // do snapshot and save to file + OperatorStateHandles snapshot = harness.snapshot(0L, 0L); + OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot"); + + harness.close(); + } + + + @Test + public void testSinglePatternAfterMigration() throws Exception { + + KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + final Event startEvent1 = new Event(42, "start", 1.0); + + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = + new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new SinglePatternNFAFactory()), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + harness.setup(); + harness.initializeState( + OperatorSnapshotUtil.readStateHandle( + OperatorSnapshotUtil.getResourceFilename("cep-migration-single-pattern-afterwards-flink1.2-snapshot"))); + harness.open(); + + harness.processElement(new StreamRecord<>(startEvent1, 5)); + + harness.processWatermark(new Watermark(20)); + + ConcurrentLinkedQueue<Object> result = harness.getOutput(); + + // watermark and the result + assertEquals(2, result.size()); + + Object resultObject = result.poll(); + assertTrue(resultObject instanceof StreamRecord); + StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject; + assertTrue(resultRecord.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue(); + + assertEquals(startEvent1, patternMap.get("start")); + + harness.close(); + } + + private static class SinglePatternNFAFactory implements NFACompiler.NFAFactory<Event> { + + private static final long serialVersionUID = 1173020762472766713L; + + private final boolean handleTimeout; + + private SinglePatternNFAFactory() { + this(false); + } + + private SinglePatternNFAFactory(boolean handleTimeout) { + this.handleTimeout = handleTimeout; + } + + @Override + public NFA<Event> createNFA() { + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter()) + .within(Time.milliseconds(10L)); + + return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout); + } + } + + private static class NFAFactory implements NFACompiler.NFAFactory<Event> { + + private static final long serialVersionUID = 1173020762472766713L; + + private final boolean handleTimeout; + + private NFAFactory() { + this(false); + } + + private NFAFactory(boolean handleTimeout) { + this.handleTimeout = handleTimeout; + } + + @Override + public NFA<Event> createNFA() { + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter()) + .followedBy("middle") + .subtype(SubEvent.class) + .where(new MiddleFilter()) + .followedBy("end") + .where(new EndFilter()) + // add a window timeout to test whether timestamps of elements in the + // priority queue in CEP operator are correctly checkpointed/restored + .within(Time.milliseconds(10L)); + + return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout); + } + } + + private static class StartFilter implements FilterFunction<Event> { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + } + + private static class MiddleFilter implements FilterFunction<SubEvent> { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getVolume() > 5.0; + } + } + + private static class EndFilter implements FilterFunction<Event> { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/852a710b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.2-snapshot ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.2-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.2-snapshot new file mode 100644 index 0000000..6775f2a Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.2-snapshot differ http://git-wip-us.apache.org/repos/asf/flink/blob/852a710b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot new file mode 100644 index 0000000..f63b7dd Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot differ http://git-wip-us.apache.org/repos/asf/flink/blob/852a710b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot new file mode 100644 index 0000000..8e0fd27 Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot differ