Repository: flink Updated Branches: refs/heads/master bcaf816dc -> 5d3506e88
[FLINK-6772] [cep] Fix ordering (by timestamp) of matched events. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d3506e8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d3506e8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d3506e8 Branch: refs/heads/master Commit: 5d3506e88f24ec0d1c2272f04570c745d319329b Parents: bcaf816 Author: kkloudas <kklou...@gmail.com> Authored: Wed May 31 17:48:34 2017 +0200 Committer: kkloudas <kklou...@gmail.com> Committed: Thu Jun 8 15:41:38 2017 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/flink/cep/nfa/NFA.java | 88 +++++++++++--------- .../org/apache/flink/cep/nfa/SharedBuffer.java | 26 +++--- .../flink/cep/nfa/compiler/NFACompiler.java | 45 ++-------- .../cep/nfa/compiler/NFAStateNameHandler.java | 79 ++++++++++++++++++ .../org/apache/flink/cep/nfa/NFAITCase.java | 64 ++++++++++++++ .../apache/flink/cep/nfa/SharedBufferTest.java | 73 +++++++++------- .../flink/cep/nfa/compiler/NFACompilerTest.java | 2 +- 7 files changed, 255 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index f438915..cac1601 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -33,6 +33,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; import org.apache.flink.cep.NonDuplicatingTypeSerializer; import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.nfa.compiler.NFAStateNameHandler; import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -43,7 +44,6 @@ import org.apache.flink.util.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Iterators; -import com.google.common.collect.ListMultimap; import javax.annotation.Nullable; @@ -231,7 +231,7 @@ public class NFA<T> implements Serializable { } eventSharedBuffer.release( - computationState.getPreviousState().getName(), + NFAStateNameHandler.getOriginalNameFromInternal(computationState.getPreviousState().getName()), computationState.getEvent(), computationState.getTimestamp(), computationState.getCounter()); @@ -248,6 +248,7 @@ public class NFA<T> implements Serializable { //if stop state reached in this path boolean shouldDiscardPath = false; for (final ComputationState<T> newComputationState: newComputationStates) { + if (newComputationState.isFinalState()) { // we've reached a final state and can thus retrieve the matching event sequence Map<String, List<T>> matchedPattern = extractCurrentMatches(newComputationState); @@ -255,7 +256,8 @@ public class NFA<T> implements Serializable { // remove found patterns because they are no longer needed eventSharedBuffer.release( - newComputationState.getPreviousState().getName(), + NFAStateNameHandler.getOriginalNameFromInternal( + newComputationState.getPreviousState().getName()), newComputationState.getEvent(), newComputationState.getTimestamp(), computationState.getCounter()); @@ -263,10 +265,11 @@ public class NFA<T> implements Serializable { //reached stop state. release entry for the stop state shouldDiscardPath = true; eventSharedBuffer.release( - newComputationState.getPreviousState().getName(), - newComputationState.getEvent(), - newComputationState.getTimestamp(), - computationState.getCounter()); + NFAStateNameHandler.getOriginalNameFromInternal( + newComputationState.getPreviousState().getName()), + newComputationState.getEvent(), + newComputationState.getTimestamp(), + computationState.getCounter()); } else { // add new computation state; it will be processed once the next event arrives statesToRetain.add(newComputationState); @@ -278,10 +281,11 @@ public class NFA<T> implements Serializable { // the buffer for (final ComputationState<T> state : statesToRetain) { eventSharedBuffer.release( - state.getPreviousState().getName(), - state.getEvent(), - state.getTimestamp(), - state.getCounter()); + NFAStateNameHandler.getOriginalNameFromInternal( + state.getPreviousState().getName()), + state.getEvent(), + state.getTimestamp(), + state.getCounter()); } } else { computationStates.addAll(statesToRetain); @@ -473,17 +477,20 @@ public class NFA<T> implements Serializable { if (computationState.isStartState()) { startTimestamp = timestamp; counter = eventSharedBuffer.put( - currentState.getName(), + NFAStateNameHandler.getOriginalNameFromInternal( + currentState.getName()), event, timestamp, currentVersion); } else { startTimestamp = computationState.getStartTimestamp(); counter = eventSharedBuffer.put( - currentState.getName(), + NFAStateNameHandler.getOriginalNameFromInternal( + currentState.getName()), event, timestamp, - previousState.getName(), + NFAStateNameHandler.getOriginalNameFromInternal( + previousState.getName()), previousEvent, computationState.getTimestamp(), computationState.getCounter(), @@ -530,10 +537,11 @@ public class NFA<T> implements Serializable { if (computationState.getEvent() != null) { // release the shared entry referenced by the current computation state. eventSharedBuffer.release( - computationState.getPreviousState().getName(), - computationState.getEvent(), - computationState.getTimestamp(), - computationState.getCounter()); + NFAStateNameHandler.getOriginalNameFromInternal( + computationState.getPreviousState().getName()), + computationState.getEvent(), + computationState.getTimestamp(), + computationState.getCounter()); } return resultingComputationStates; @@ -551,7 +559,9 @@ public class NFA<T> implements Serializable { ComputationState<T> computationState = ComputationState.createState( this, currentState, previousState, event, counter, timestamp, version, startTimestamp); computationStates.add(computationState); - eventSharedBuffer.lock(previousState.getName(), event, timestamp, counter); + + String originalStateName = NFAStateNameHandler.getOriginalNameFromInternal(previousState.getName()); + eventSharedBuffer.lock(originalStateName, event, timestamp, counter); } private State<T> findFinalStateAfterProceed(State<T> state, T event, ComputationState<T> computationState) { @@ -641,32 +651,34 @@ public class NFA<T> implements Serializable { eventSerializer = nonDuplicatingTypeSerializer.getTypeSerializer(); } - Collection<ListMultimap<String, T>> paths = eventSharedBuffer.extractPatterns( - computationState.getPreviousState().getName(), + List<Map<String, List<T>>> paths = eventSharedBuffer.extractPatterns( + NFAStateNameHandler.getOriginalNameFromInternal( + computationState.getPreviousState().getName()), computationState.getEvent(), computationState.getTimestamp(), computationState.getCounter(), computationState.getVersion()); + if (paths.isEmpty()) { + return new HashMap<>(); + } // for a given computation state, we cannot have more than one matching patterns. - Preconditions.checkState(paths.size() <= 1); + Preconditions.checkState(paths.size() == 1); Map<String, List<T>> result = new HashMap<>(); - for (ListMultimap<String, T> path: paths) { - for (String key: path.keySet()) { - List<T> events = path.get(key); - - String originalKey = NFACompiler.getOriginalStateNameFromInternal(key); - List<T> values = result.get(originalKey); - if (values == null) { - values = new ArrayList<>(events.size()); - } + Map<String, List<T>> path = paths.get(0); + for (String key: path.keySet()) { + List<T> events = path.get(key); + + List<T> values = result.get(key); + if (values == null) { + values = new ArrayList<>(events.size()); + result.put(key, values); + } - for (T event: events) { - // copy the element so that the user can change it - values.add(eventSerializer.isImmutableType() ? event : eventSerializer.copy(event)); - } - result.put(originalKey, values); + for (T event: events) { + // copy the element so that the user can change it + values.add(eventSerializer.isImmutableType() ? event : eventSerializer.copy(event)); } } return result; @@ -871,10 +883,6 @@ public class NFA<T> implements Serializable { return null; } - private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { - ois.defaultReadObject(); - } - @Override public NFA<T> copy(NFA<T> from) { try { http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java index a44b333..d592c65 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java @@ -11,7 +11,7 @@ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, - * WIVHOUV WARRANVIES OR CONDIVIONS OF ANY KIND, either express or implied. + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ @@ -34,8 +34,6 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.Preconditions; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ListMultimap; import org.apache.commons.lang3.StringUtils; import java.io.ByteArrayInputStream; @@ -217,14 +215,14 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { * @param version Version of the previous relation which shall be extracted * @return Collection of previous relations starting with the given value */ - public Collection<ListMultimap<K, V>> extractPatterns( + public List<Map<K, List<V>>> extractPatterns( final K key, final V value, final long timestamp, final int counter, final DeweyNumber version) { - Collection<ListMultimap<K, V>> result = new ArrayList<>(); + List<Map<K, List<V>>> result = new ArrayList<>(); // stack to remember the current extraction states Stack<ExtractionState<K, V>> extractionStates = new Stack<>(); @@ -244,12 +242,18 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { // termination criterion if (currentEntry == null) { - final ListMultimap<K, V> completePath = ArrayListMultimap.create(); + final Map<K, List<V>> completePath = new HashMap<>(); while (!currentPath.isEmpty()) { final SharedBufferEntry<K, V> currentPathEntry = currentPath.pop(); - completePath.put(currentPathEntry.getKey(), currentPathEntry.getValueTime().getValue()); + K k = currentPathEntry.getKey(); + List<V> values = completePath.get(k); + if (values == null) { + values = new ArrayList<>(); + completePath.put(k, values); + } + values.add(currentPathEntry.getValueTime().getValue()); } result.add(completePath); @@ -777,12 +781,6 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { ExtractionState( final SharedBufferEntry<K, V> entry, - final DeweyNumber version) { - this(entry, version, null); - } - - ExtractionState( - final SharedBufferEntry<K, V> entry, final DeweyNumber version, final Stack<SharedBufferEntry<K, V>> path) { this.entry = entry; @@ -942,7 +940,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { ValueTimeWrapper<V> valueTimeWrapper = sharedBuffer.getValueTime(); - valueSerializer.serialize(valueTimeWrapper.value, target); + valueSerializer.serialize(valueTimeWrapper.getValue(), target); target.writeLong(valueTimeWrapper.getTimestamp()); target.writeInt(valueTimeWrapper.getCounter()); http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java index 8d1d366..ce42acd 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java @@ -32,7 +32,6 @@ import org.apache.flink.cep.pattern.conditions.BooleanConditions; import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.cep.pattern.conditions.NotCondition; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.util.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Iterators; @@ -44,10 +43,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; /** * Compiler class containing methods to compile a {@link Pattern} into a {@link NFA} or a @@ -57,8 +54,6 @@ public class NFACompiler { protected static final String ENDING_STATE_NAME = "$endState$"; - protected static final String STATE_NAME_DELIM = ":"; - /** * Compiles the given pattern into a {@link NFA}. * @@ -77,11 +72,6 @@ public class NFACompiler { return factory.createNFA(); } - public static String getOriginalStateNameFromInternal(String internalName) { - Preconditions.checkNotNull(internalName); - return internalName.split(STATE_NAME_DELIM)[0]; - } - /** * Compiles the given pattern into a {@link NFAFactory}. The NFA factory can be used to create * multiple NFAs. @@ -115,7 +105,7 @@ public class NFACompiler { */ static class NFAFactoryCompiler<T> { - private final Set<String> usedNames = new HashSet<>(); + private final NFAStateNameHandler stateNameHandler = new NFAStateNameHandler(); private final Map<String, State<T>> stopStates = new HashMap<>(); private final List<State<T>> states = new ArrayList<>(); @@ -207,7 +197,8 @@ public class NFACompiler { if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) { //skip notFollow patterns, they are converted into edge conditions } else if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_NEXT) { - checkPatternNameUniqueness(currentPattern.getName()); + stateNameHandler.checkNameUniqueness(currentPattern.getName()); + final State<T> notNext = createState(currentPattern.getName(), State.StateType.Normal); final IterativeCondition<T> notCondition = (IterativeCondition<T>) currentPattern.getCondition(); final State<T> stopState = createStopState(notCondition, currentPattern.getName()); @@ -221,7 +212,7 @@ public class NFACompiler { notNext.addProceed(stopState, notCondition); lastSink = notNext; } else { - checkPatternNameUniqueness(currentPattern.getName()); + stateNameHandler.checkNameUniqueness(currentPattern.getName()); lastSink = convertPattern(lastSink); } @@ -246,7 +237,7 @@ public class NFACompiler { */ @SuppressWarnings("unchecked") private State<T> createStartState(State<T> sinkState) { - checkPatternNameUniqueness(currentPattern.getName()); + stateNameHandler.checkNameUniqueness(currentPattern.getName()); final State<T> beginningState = convertPattern(sinkState); beginningState.makeStart(); return beginningState; @@ -284,36 +275,12 @@ public class NFACompiler { * @return the created state */ private State<T> createState(String name, State.StateType stateType) { - String stateName = getUniqueInternalStateName(name); - usedNames.add(stateName); + String stateName = stateNameHandler.getUniqueInternalName(name); State<T> state = new State<>(stateName, stateType); states.add(state); return state; } - /** - * Used to give a unique name to states created - * during the translation process. - * - * @param baseName The base of the name. - */ - private String getUniqueInternalStateName(String baseName) { - int counter = 0; - String candidate = baseName; - while (usedNames.contains(candidate)) { - candidate = baseName + STATE_NAME_DELIM + counter++; - } - return candidate; - } - - private void checkPatternNameUniqueness(String patternName) { - if (usedNames.contains(patternName)) { - throw new MalformedPatternException( - "Duplicate pattern name: " + patternName + ". " + - "Pattern names must be unique."); - } - } - private State<T> createStopState(final IterativeCondition<T> notCondition, final String name) { // We should not duplicate the notStates. All states from which we can stop should point to the same one. State<T> stopState = stopStates.get(name); http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java new file mode 100644 index 0000000..558b6f4 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.compiler; + +import org.apache.flink.cep.pattern.MalformedPatternException; +import org.apache.flink.util.Preconditions; + +import java.util.HashSet; +import java.util.Set; + +/** + * A utility class used to handle name conventions and guarantee unique + * names for the states of our {@link org.apache.flink.cep.nfa.NFA}. + */ +public class NFAStateNameHandler { + + private static final String STATE_NAME_DELIM = ":"; + + private final Set<String> usedNames = new HashSet<>(); + + /** + * Implements the reverse process of the {@link #getUniqueInternalName(String)}. + * + * @param internalName The name to be decoded. + * @return The original, user-specified name for the state. + */ + public static String getOriginalNameFromInternal(String internalName) { + Preconditions.checkNotNull(internalName); + return internalName.split(STATE_NAME_DELIM)[0]; + } + + /** + * Checks if the given name is already used or not. If yes, it + * throws a {@link MalformedPatternException}. + * + * @param name The name to be checked. + */ + public void checkNameUniqueness(String name) { + if (usedNames.contains(name)) { + throw new MalformedPatternException("Duplicate pattern name: " + name + ". Names must be unique."); + } + } + + /** + * Used to give a unique name to {@link org.apache.flink.cep.nfa.NFA} states + * created during the translation process. The name format will be + * {@code baseName:counter} , where the counter is increasing for states with + * the same {@code baseName}. + * + * @param baseName The base of the name. + * @return The (unique) name that is going to be used internally for the state. + */ + public String getUniqueInternalName(String baseName) { + int counter = 0; + String candidate = baseName; + while (usedNames.contains(candidate)) { + candidate = baseName + STATE_NAME_DELIM + counter++; + } + usedNames.add(candidate); + return candidate; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java index 92b49d3..20cb482 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java @@ -30,6 +30,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; import com.google.common.collect.Lists; +import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; @@ -2622,4 +2623,67 @@ public class NFAITCase extends TestLogger { Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end) )); } + + @Test + public void testNFAResultOrdering() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent1 = new Event(41, "a-1", 2.0); + Event startEvent2 = new Event(41, "a-2", 3.0); + Event startEvent3 = new Event(41, "a-3", 4.0); + Event startEvent4 = new Event(41, "a-4", 5.0); + Event endEvent1 = new Event(41, "b-1", 6.0); + Event endEvent2 = new Event(41, "b-2", 7.0); + Event endEvent3 = new Event(41, "b-3", 8.0); + + inputEvents.add(new StreamRecord<>(startEvent1, 1)); + inputEvents.add(new StreamRecord<>(startEvent2, 3)); + inputEvents.add(new StreamRecord<>(startEvent3, 4)); + inputEvents.add(new StreamRecord<>(startEvent4, 5)); + inputEvents.add(new StreamRecord<>(endEvent1, 6)); + inputEvents.add(new StreamRecord<>(endEvent2, 7)); + inputEvents.add(new StreamRecord<>(endEvent3, 10)); + + Pattern<Event, ?> pattern = Pattern + .<Event>begin("start") + .where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 6452194090480345053L; + + @Override + public boolean filter(Event s) throws Exception { + return s.getName().startsWith("a-"); + } + }).times(4).allowCombinations() + .followedByAny("middle") + .where(new SimpleCondition<Event>() { + private static final long serialVersionUID = -6838398439317275390L; + + public boolean filter(Event s) throws Exception { + return s.getName().startsWith("b-"); + } + }).times(3).consecutive(); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, List<Event>>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + resultingPatterns.addAll(patterns); + } + + Assert.assertEquals(1L, resultingPatterns.size()); + + Map<String, List<Event>> match = resultingPatterns.get(0); + Assert.assertArrayEquals( + match.get("start").toArray(), + Lists.newArrayList(startEvent1, startEvent2, startEvent3, startEvent4).toArray()); + + Assert.assertArrayEquals( + match.get("middle").toArray(), + Lists.newArrayList(endEvent1, endEvent2, endEvent3).toArray()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java index 44033c1..3621bad 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java @@ -24,15 +24,17 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.TestLogger; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ListMultimap; import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -54,28 +56,43 @@ public class SharedBufferTest extends TestLogger { events[i] = new Event(i + 1, "e" + (i + 1), i); } - ListMultimap<String, Event> expectedPattern1 = ArrayListMultimap.create(); - expectedPattern1.put("a1", events[2]); - expectedPattern1.put("a[]", events[3]); - expectedPattern1.put("b", events[5]); - - ListMultimap<String, Event> expectedPattern2 = ArrayListMultimap.create(); - expectedPattern2.put("a1", events[0]); - expectedPattern2.put("a[]", events[1]); - expectedPattern2.put("a[]", events[2]); - expectedPattern2.put("a[]", events[3]); - expectedPattern2.put("a[]", events[4]); - expectedPattern2.put("b", events[5]); - - ListMultimap<String, Event> expectedPattern3 = ArrayListMultimap.create(); - expectedPattern3.put("a1", events[0]); - expectedPattern3.put("a[]", events[1]); - expectedPattern3.put("a[]", events[2]); - expectedPattern3.put("a[]", events[3]); - expectedPattern3.put("a[]", events[4]); - expectedPattern3.put("a[]", events[5]); - expectedPattern3.put("a[]", events[6]); - expectedPattern3.put("b", events[7]); + Map<String, List<Event>> expectedPattern1 = new HashMap<>(); + expectedPattern1.put("a1", new ArrayList<Event>()); + expectedPattern1.get("a1").add(events[2]); + + expectedPattern1.put("a[]", new ArrayList<Event>()); + expectedPattern1.get("a[]").add(events[3]); + + expectedPattern1.put("b", new ArrayList<Event>()); + expectedPattern1.get("b").add(events[5]); + + Map<String, List<Event>> expectedPattern2 = new HashMap<>(); + expectedPattern2.put("a1", new ArrayList<Event>()); + expectedPattern2.get("a1").add(events[0]); + + expectedPattern2.put("a[]", new ArrayList<Event>()); + expectedPattern2.get("a[]").add(events[1]); + expectedPattern2.get("a[]").add(events[2]); + expectedPattern2.get("a[]").add(events[3]); + expectedPattern2.get("a[]").add(events[4]); + + expectedPattern2.put("b", new ArrayList<Event>()); + expectedPattern2.get("b").add(events[5]); + + Map<String, List<Event>> expectedPattern3 = new HashMap<>(); + expectedPattern3.put("a1", new ArrayList<Event>()); + expectedPattern3.get("a1").add(events[0]); + + expectedPattern3.put("a[]", new ArrayList<Event>()); + expectedPattern3.get("a[]").add(events[1]); + expectedPattern3.get("a[]").add(events[2]); + expectedPattern3.get("a[]").add(events[3]); + expectedPattern3.get("a[]").add(events[4]); + expectedPattern3.get("a[]").add(events[5]); + expectedPattern3.get("a[]").add(events[6]); + + expectedPattern3.put("b", new ArrayList<Event>()); + expectedPattern3.get("b").add(events[7]); sharedBuffer.put("a1", events[0], timestamp, null, null, 0, 0, DeweyNumber.fromString("1")); sharedBuffer.put("a[]", events[1], timestamp, "a1", events[0], timestamp, 0, DeweyNumber.fromString("1.0")); @@ -90,12 +107,12 @@ public class SharedBufferTest extends TestLogger { sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], timestamp, 5, DeweyNumber.fromString("1.1")); sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], timestamp, 6, DeweyNumber.fromString("1.1.0")); - Collection<ListMultimap<String, Event>> patterns3 = sharedBuffer.extractPatterns("b", events[7], timestamp, 7, DeweyNumber.fromString("1.1.0")); + Collection<Map<String, List<Event>>> patterns3 = sharedBuffer.extractPatterns("b", events[7], timestamp, 7, DeweyNumber.fromString("1.1.0")); sharedBuffer.release("b", events[7], timestamp, 7); - Collection<ListMultimap<String, Event>> patterns4 = sharedBuffer.extractPatterns("b", events[7], timestamp, 7, DeweyNumber.fromString("1.1.0")); + Collection<Map<String, List<Event>>> patterns4 = sharedBuffer.extractPatterns("b", events[7], timestamp, 7, DeweyNumber.fromString("1.1.0")); - Collection<ListMultimap<String, Event>> patterns1 = sharedBuffer.extractPatterns("b", events[5], timestamp, 2, DeweyNumber.fromString("2.0.0")); - Collection<ListMultimap<String, Event>> patterns2 = sharedBuffer.extractPatterns("b", events[5], timestamp, 5, DeweyNumber.fromString("1.0.0")); + Collection<Map<String, List<Event>>> patterns1 = sharedBuffer.extractPatterns("b", events[5], timestamp, 2, DeweyNumber.fromString("2.0.0")); + Collection<Map<String, List<Event>>> patterns2 = sharedBuffer.extractPatterns("b", events[5], timestamp, 5, DeweyNumber.fromString("1.0.0")); sharedBuffer.release("b", events[5], timestamp, 2); sharedBuffer.release("b", events[5], timestamp, 5); http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java index cd12071..6d4329a 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java @@ -80,7 +80,7 @@ public class NFACompilerTest extends TestLogger { // adjust the rule expectedException.expect(MalformedPatternException.class); - expectedException.expectMessage("Duplicate pattern name: start. Pattern names must be unique."); + expectedException.expectMessage("Duplicate pattern name: start. Names must be unique."); Pattern<Event, ?> invalidPattern = Pattern.<Event>begin("start").where(new TestFilter()) .followedBy("middle").where(new TestFilter())