Repository: flink
Updated Branches:
  refs/heads/master bcaf816dc -> 5d3506e88


[FLINK-6772] [cep] Fix ordering (by timestamp) of matched events.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d3506e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d3506e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d3506e8

Branch: refs/heads/master
Commit: 5d3506e88f24ec0d1c2272f04570c745d319329b
Parents: bcaf816
Author: kkloudas <kklou...@gmail.com>
Authored: Wed May 31 17:48:34 2017 +0200
Committer: kkloudas <kklou...@gmail.com>
Committed: Thu Jun 8 15:41:38 2017 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 88 +++++++++++---------
 .../org/apache/flink/cep/nfa/SharedBuffer.java  | 26 +++---
 .../flink/cep/nfa/compiler/NFACompiler.java     | 45 ++--------
 .../cep/nfa/compiler/NFAStateNameHandler.java   | 79 ++++++++++++++++++
 .../org/apache/flink/cep/nfa/NFAITCase.java     | 64 ++++++++++++++
 .../apache/flink/cep/nfa/SharedBufferTest.java  | 73 +++++++++-------
 .../flink/cep/nfa/compiler/NFACompilerTest.java |  2 +-
 7 files changed, 255 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index f438915..cac1601 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.cep.NonDuplicatingTypeSerializer;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.nfa.compiler.NFAStateNameHandler;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -43,7 +44,6 @@ import org.apache.flink.util.Preconditions;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
-import com.google.common.collect.ListMultimap;
 
 import javax.annotation.Nullable;
 
@@ -231,7 +231,7 @@ public class NFA<T> implements Serializable {
                                }
 
                                eventSharedBuffer.release(
-                                               
computationState.getPreviousState().getName(),
+                                               
NFAStateNameHandler.getOriginalNameFromInternal(computationState.getPreviousState().getName()),
                                                computationState.getEvent(),
                                                computationState.getTimestamp(),
                                                computationState.getCounter());
@@ -248,6 +248,7 @@ public class NFA<T> implements Serializable {
                        //if stop state reached in this path
                        boolean shouldDiscardPath = false;
                        for (final ComputationState<T> newComputationState: 
newComputationStates) {
+
                                if (newComputationState.isFinalState()) {
                                        // we've reached a final state and can 
thus retrieve the matching event sequence
                                        Map<String, List<T>> matchedPattern = 
extractCurrentMatches(newComputationState);
@@ -255,7 +256,8 @@ public class NFA<T> implements Serializable {
 
                                        // remove found patterns because they 
are no longer needed
                                        eventSharedBuffer.release(
-                                                       
newComputationState.getPreviousState().getName(),
+                                                       
NFAStateNameHandler.getOriginalNameFromInternal(
+                                                                       
newComputationState.getPreviousState().getName()),
                                                        
newComputationState.getEvent(),
                                                        
newComputationState.getTimestamp(),
                                                        
computationState.getCounter());
@@ -263,10 +265,11 @@ public class NFA<T> implements Serializable {
                                        //reached stop state. release entry for 
the stop state
                                        shouldDiscardPath = true;
                                        eventSharedBuffer.release(
-                                               
newComputationState.getPreviousState().getName(),
-                                               newComputationState.getEvent(),
-                                               
newComputationState.getTimestamp(),
-                                               computationState.getCounter());
+                                                       
NFAStateNameHandler.getOriginalNameFromInternal(
+                                                                       
newComputationState.getPreviousState().getName()),
+                                                       
newComputationState.getEvent(),
+                                                       
newComputationState.getTimestamp(),
+                                                       
computationState.getCounter());
                                } else {
                                        // add new computation state; it will 
be processed once the next event arrives
                                        statesToRetain.add(newComputationState);
@@ -278,10 +281,11 @@ public class NFA<T> implements Serializable {
                                // the buffer
                                for (final ComputationState<T> state : 
statesToRetain) {
                                        eventSharedBuffer.release(
-                                               
state.getPreviousState().getName(),
-                                               state.getEvent(),
-                                               state.getTimestamp(),
-                                               state.getCounter());
+                                                       
NFAStateNameHandler.getOriginalNameFromInternal(
+                                                                       
state.getPreviousState().getName()),
+                                                       state.getEvent(),
+                                                       state.getTimestamp(),
+                                                       state.getCounter());
                                }
                        } else {
                                computationStates.addAll(statesToRetain);
@@ -473,17 +477,20 @@ public class NFA<T> implements Serializable {
                                        if (computationState.isStartState()) {
                                                startTimestamp = timestamp;
                                                counter = eventSharedBuffer.put(
-                                                       currentState.getName(),
+                                                       
NFAStateNameHandler.getOriginalNameFromInternal(
+                                                                       
currentState.getName()),
                                                        event,
                                                        timestamp,
                                                        currentVersion);
                                        } else {
                                                startTimestamp = 
computationState.getStartTimestamp();
                                                counter = eventSharedBuffer.put(
-                                                       currentState.getName(),
+                                                       
NFAStateNameHandler.getOriginalNameFromInternal(
+                                                                       
currentState.getName()),
                                                        event,
                                                        timestamp,
-                                                       previousState.getName(),
+                                                       
NFAStateNameHandler.getOriginalNameFromInternal(
+                                                                       
previousState.getName()),
                                                        previousEvent,
                                                        
computationState.getTimestamp(),
                                                        
computationState.getCounter(),
@@ -530,10 +537,11 @@ public class NFA<T> implements Serializable {
                if (computationState.getEvent() != null) {
                        // release the shared entry referenced by the current 
computation state.
                        eventSharedBuffer.release(
-                               computationState.getPreviousState().getName(),
-                               computationState.getEvent(),
-                               computationState.getTimestamp(),
-                               computationState.getCounter());
+                                       
NFAStateNameHandler.getOriginalNameFromInternal(
+                                                       
computationState.getPreviousState().getName()),
+                                       computationState.getEvent(),
+                                       computationState.getTimestamp(),
+                                       computationState.getCounter());
                }
 
                return resultingComputationStates;
@@ -551,7 +559,9 @@ public class NFA<T> implements Serializable {
                ComputationState<T> computationState = 
ComputationState.createState(
                                this, currentState, previousState, event, 
counter, timestamp, version, startTimestamp);
                computationStates.add(computationState);
-               eventSharedBuffer.lock(previousState.getName(), event, 
timestamp, counter);
+
+               String originalStateName = 
NFAStateNameHandler.getOriginalNameFromInternal(previousState.getName());
+               eventSharedBuffer.lock(originalStateName, event, timestamp, 
counter);
        }
 
        private State<T> findFinalStateAfterProceed(State<T> state, T event, 
ComputationState<T> computationState) {
@@ -641,32 +651,34 @@ public class NFA<T> implements Serializable {
                        eventSerializer = 
nonDuplicatingTypeSerializer.getTypeSerializer();
                }
 
-               Collection<ListMultimap<String, T>> paths = 
eventSharedBuffer.extractPatterns(
-                               computationState.getPreviousState().getName(),
+               List<Map<String, List<T>>> paths = 
eventSharedBuffer.extractPatterns(
+                               NFAStateNameHandler.getOriginalNameFromInternal(
+                                               
computationState.getPreviousState().getName()),
                                computationState.getEvent(),
                                computationState.getTimestamp(),
                                computationState.getCounter(),
                                computationState.getVersion());
 
+               if (paths.isEmpty()) {
+                       return new HashMap<>();
+               }
                // for a given computation state, we cannot have more than one 
matching patterns.
-               Preconditions.checkState(paths.size() <= 1);
+               Preconditions.checkState(paths.size() == 1);
 
                Map<String, List<T>> result = new HashMap<>();
-               for (ListMultimap<String, T> path: paths) {
-                       for (String key: path.keySet()) {
-                               List<T> events = path.get(key);
-
-                               String originalKey = 
NFACompiler.getOriginalStateNameFromInternal(key);
-                               List<T> values = result.get(originalKey);
-                               if (values == null) {
-                                       values = new ArrayList<>(events.size());
-                               }
+               Map<String, List<T>> path = paths.get(0);
+               for (String key: path.keySet()) {
+                       List<T> events = path.get(key);
+
+                       List<T> values = result.get(key);
+                       if (values == null) {
+                               values = new ArrayList<>(events.size());
+                               result.put(key, values);
+                       }
 
-                               for (T event: events) {
-                                       // copy the element so that the user 
can change it
-                                       
values.add(eventSerializer.isImmutableType() ? event : 
eventSerializer.copy(event));
-                               }
-                               result.put(originalKey, values);
+                       for (T event: events) {
+                               // copy the element so that the user can change 
it
+                               values.add(eventSerializer.isImmutableType() ? 
event : eventSerializer.copy(event));
                        }
                }
                return result;
@@ -871,10 +883,6 @@ public class NFA<T> implements Serializable {
                        return null;
                }
 
-               private void readObject(ObjectInputStream ois) throws 
IOException, ClassNotFoundException {
-                       ois.defaultReadObject();
-               }
-
                @Override
                public NFA<T> copy(NFA<T> from) {
                        try {

http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index a44b333..d592c65 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -11,7 +11,7 @@
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
- * WIVHOUV WARRANVIES OR CONDIVIONS OF ANY KIND, either express or implied.
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
@@ -34,8 +34,6 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
 import org.apache.commons.lang3.StringUtils;
 
 import java.io.ByteArrayInputStream;
@@ -217,14 +215,14 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
         * @param version Version of the previous relation which shall be 
extracted
         * @return Collection of previous relations starting with the given 
value
         */
-       public Collection<ListMultimap<K, V>> extractPatterns(
+       public List<Map<K, List<V>>> extractPatterns(
                        final K key,
                        final V value,
                        final long timestamp,
                        final int counter,
                        final DeweyNumber version) {
 
-               Collection<ListMultimap<K, V>> result = new ArrayList<>();
+               List<Map<K, List<V>>> result = new ArrayList<>();
 
                // stack to remember the current extraction states
                Stack<ExtractionState<K, V>> extractionStates = new Stack<>();
@@ -244,12 +242,18 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
 
                                // termination criterion
                                if (currentEntry == null) {
-                                       final ListMultimap<K, V> completePath = 
ArrayListMultimap.create();
+                                       final Map<K, List<V>> completePath = 
new HashMap<>();
 
                                        while (!currentPath.isEmpty()) {
                                                final SharedBufferEntry<K, V> 
currentPathEntry = currentPath.pop();
 
-                                               
completePath.put(currentPathEntry.getKey(), 
currentPathEntry.getValueTime().getValue());
+                                               K k = currentPathEntry.getKey();
+                                               List<V> values = 
completePath.get(k);
+                                               if (values == null) {
+                                                       values = new 
ArrayList<>();
+                                                       completePath.put(k, 
values);
+                                               }
+                                               
values.add(currentPathEntry.getValueTime().getValue());
                                        }
 
                                        result.add(completePath);
@@ -777,12 +781,6 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
 
                ExtractionState(
                                final SharedBufferEntry<K, V> entry,
-                               final DeweyNumber version) {
-                       this(entry, version, null);
-               }
-
-               ExtractionState(
-                               final SharedBufferEntry<K, V> entry,
                                final DeweyNumber version,
                                final Stack<SharedBufferEntry<K, V>> path) {
                        this.entry = entry;
@@ -942,7 +940,7 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
 
                                        ValueTimeWrapper<V> valueTimeWrapper = 
sharedBuffer.getValueTime();
 
-                                       
valueSerializer.serialize(valueTimeWrapper.value, target);
+                                       
valueSerializer.serialize(valueTimeWrapper.getValue(), target);
                                        
target.writeLong(valueTimeWrapper.getTimestamp());
                                        
target.writeInt(valueTimeWrapper.getCounter());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 8d1d366..ce42acd 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -32,7 +32,6 @@ import 
org.apache.flink.cep.pattern.conditions.BooleanConditions;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.NotCondition;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.util.Preconditions;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
@@ -44,10 +43,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * Compiler class containing methods to compile a {@link Pattern} into a 
{@link NFA} or a
@@ -57,8 +54,6 @@ public class NFACompiler {
 
        protected static final String ENDING_STATE_NAME = "$endState$";
 
-       protected static final String STATE_NAME_DELIM = ":";
-
        /**
         * Compiles the given pattern into a {@link NFA}.
         *
@@ -77,11 +72,6 @@ public class NFACompiler {
                return factory.createNFA();
        }
 
-       public static String getOriginalStateNameFromInternal(String 
internalName) {
-               Preconditions.checkNotNull(internalName);
-               return internalName.split(STATE_NAME_DELIM)[0];
-       }
-
        /**
         * Compiles the given pattern into a {@link NFAFactory}. The NFA 
factory can be used to create
         * multiple NFAs.
@@ -115,7 +105,7 @@ public class NFACompiler {
         */
        static class NFAFactoryCompiler<T> {
 
-               private final Set<String> usedNames = new HashSet<>();
+               private final NFAStateNameHandler stateNameHandler = new 
NFAStateNameHandler();
                private final Map<String, State<T>> stopStates = new 
HashMap<>();
                private final List<State<T>> states = new ArrayList<>();
 
@@ -207,7 +197,8 @@ public class NFACompiler {
                                if 
(currentPattern.getQuantifier().getConsumingStrategy() == 
Quantifier.ConsumingStrategy.NOT_FOLLOW) {
                                        //skip notFollow patterns, they are 
converted into edge conditions
                                } else if 
(currentPattern.getQuantifier().getConsumingStrategy() == 
Quantifier.ConsumingStrategy.NOT_NEXT) {
-                                       
checkPatternNameUniqueness(currentPattern.getName());
+                                       
stateNameHandler.checkNameUniqueness(currentPattern.getName());
+
                                        final State<T> notNext = 
createState(currentPattern.getName(), State.StateType.Normal);
                                        final IterativeCondition<T> 
notCondition = (IterativeCondition<T>) currentPattern.getCondition();
                                        final State<T> stopState = 
createStopState(notCondition, currentPattern.getName());
@@ -221,7 +212,7 @@ public class NFACompiler {
                                        notNext.addProceed(stopState, 
notCondition);
                                        lastSink = notNext;
                                } else {
-                                       
checkPatternNameUniqueness(currentPattern.getName());
+                                       
stateNameHandler.checkNameUniqueness(currentPattern.getName());
                                        lastSink = convertPattern(lastSink);
                                }
 
@@ -246,7 +237,7 @@ public class NFACompiler {
                 */
                @SuppressWarnings("unchecked")
                private State<T> createStartState(State<T> sinkState) {
-                       checkPatternNameUniqueness(currentPattern.getName());
+                       
stateNameHandler.checkNameUniqueness(currentPattern.getName());
                        final State<T> beginningState = 
convertPattern(sinkState);
                        beginningState.makeStart();
                        return beginningState;
@@ -284,36 +275,12 @@ public class NFACompiler {
                 * @return the created state
                 */
                private State<T> createState(String name, State.StateType 
stateType) {
-                       String stateName = getUniqueInternalStateName(name);
-                       usedNames.add(stateName);
+                       String stateName = 
stateNameHandler.getUniqueInternalName(name);
                        State<T> state = new State<>(stateName, stateType);
                        states.add(state);
                        return state;
                }
 
-               /**
-                * Used to give a unique name to states created
-                * during the translation process.
-                *
-                * @param baseName The base of the name.
-                */
-               private String getUniqueInternalStateName(String baseName) {
-                       int counter = 0;
-                       String candidate = baseName;
-                       while (usedNames.contains(candidate)) {
-                               candidate = baseName + STATE_NAME_DELIM + 
counter++;
-                       }
-                       return candidate;
-               }
-
-               private void checkPatternNameUniqueness(String patternName) {
-                       if (usedNames.contains(patternName)) {
-                               throw new MalformedPatternException(
-                                               "Duplicate pattern name: " + 
patternName + ". " +
-                                                               "Pattern names 
must be unique.");
-                       }
-               }
-
                private State<T> createStopState(final IterativeCondition<T> 
notCondition, final String name) {
                        // We should not duplicate the notStates. All states 
from which we can stop should point to the same one.
                        State<T> stopState = stopStates.get(name);

http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java
new file mode 100644
index 0000000..558b6f4
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.compiler;
+
+import org.apache.flink.cep.pattern.MalformedPatternException;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A utility class used to handle name conventions and guarantee unique
+ * names for the states of our {@link org.apache.flink.cep.nfa.NFA}.
+ */
+public class NFAStateNameHandler {
+
+       private static final String STATE_NAME_DELIM = ":";
+
+       private final Set<String> usedNames = new HashSet<>();
+
+       /**
+        * Implements the reverse process of the {@link 
#getUniqueInternalName(String)}.
+        *
+        * @param internalName The name to be decoded.
+        * @return The original, user-specified name for the state.
+        */
+       public static String getOriginalNameFromInternal(String internalName) {
+               Preconditions.checkNotNull(internalName);
+               return internalName.split(STATE_NAME_DELIM)[0];
+       }
+
+       /**
+        * Checks if the given name is already used or not. If yes, it
+        * throws a {@link MalformedPatternException}.
+        *
+        * @param name The name to be checked.
+        */
+       public void checkNameUniqueness(String name) {
+               if (usedNames.contains(name)) {
+                       throw new MalformedPatternException("Duplicate pattern 
name: " + name + ". Names must be unique.");
+               }
+       }
+
+       /**
+        * Used to give a unique name to {@link org.apache.flink.cep.nfa.NFA} 
states
+        * created during the translation process. The name format will be
+        * {@code baseName:counter} , where the counter is increasing for 
states with
+        * the same {@code baseName}.
+        *
+        * @param baseName The base of the name.
+        * @return The (unique) name that is going to be used internally for 
the state.
+        */
+       public String getUniqueInternalName(String baseName) {
+               int counter = 0;
+               String candidate = baseName;
+               while (usedNames.contains(candidate)) {
+                       candidate = baseName + STATE_NAME_DELIM + counter++;
+               }
+               usedNames.add(candidate);
+               return candidate;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 92b49d3..20cb482 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 
 import com.google.common.collect.Lists;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -2622,4 +2623,67 @@ public class NFAITCase extends TestLogger {
                        Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, end)
                        ));
        }
+
+       @Test
+       public void testNFAResultOrdering() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent1 = new Event(41, "a-1", 2.0);
+               Event startEvent2 = new Event(41, "a-2", 3.0);
+               Event startEvent3 = new Event(41, "a-3", 4.0);
+               Event startEvent4 = new Event(41, "a-4", 5.0);
+               Event endEvent1 = new Event(41, "b-1", 6.0);
+               Event endEvent2 = new Event(41, "b-2", 7.0);
+               Event endEvent3 = new Event(41, "b-3", 8.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent1, 1));
+               inputEvents.add(new StreamRecord<>(startEvent2, 3));
+               inputEvents.add(new StreamRecord<>(startEvent3, 4));
+               inputEvents.add(new StreamRecord<>(startEvent4, 5));
+               inputEvents.add(new StreamRecord<>(endEvent1, 6));
+               inputEvents.add(new StreamRecord<>(endEvent2, 7));
+               inputEvents.add(new StreamRecord<>(endEvent3, 10));
+
+               Pattern<Event, ?> pattern = Pattern
+                               .<Event>begin("start")
+                               .where(new SimpleCondition<Event>() {
+                                       private static final long 
serialVersionUID = 6452194090480345053L;
+
+                                       @Override
+                                       public boolean filter(Event s) throws 
Exception {
+                                               return 
s.getName().startsWith("a-");
+                                       }
+                               }).times(4).allowCombinations()
+                               .followedByAny("middle")
+                               .where(new SimpleCondition<Event>() {
+                                       private static final long 
serialVersionUID = -6838398439317275390L;
+
+                                       public boolean filter(Event s) throws 
Exception {
+                                               return 
s.getName().startsWith("b-");
+                                       }
+                               }).times(3).consecutive();
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               List<Map<String, List<Event>>> resultingPatterns = new 
ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, List<Event>>> patterns = 
nfa.process(
+                                       inputEvent.getValue(),
+                                       inputEvent.getTimestamp()).f0;
+
+                       resultingPatterns.addAll(patterns);
+               }
+
+               Assert.assertEquals(1L, resultingPatterns.size());
+
+               Map<String, List<Event>> match = resultingPatterns.get(0);
+               Assert.assertArrayEquals(
+                               match.get("start").toArray(),
+                               Lists.newArrayList(startEvent1, startEvent2, 
startEvent3, startEvent4).toArray());
+
+               Assert.assertArrayEquals(
+                               match.get("middle").toArray(),
+                               Lists.newArrayList(endEvent1, endEvent2, 
endEvent3).toArray());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
index 44033c1..3621bad 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
@@ -24,15 +24,17 @@ import 
org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.TestLogger;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -54,28 +56,43 @@ public class SharedBufferTest extends TestLogger {
                        events[i] = new Event(i + 1, "e" + (i + 1), i);
                }
 
-               ListMultimap<String, Event> expectedPattern1 = 
ArrayListMultimap.create();
-               expectedPattern1.put("a1", events[2]);
-               expectedPattern1.put("a[]", events[3]);
-               expectedPattern1.put("b", events[5]);
-
-               ListMultimap<String, Event> expectedPattern2 = 
ArrayListMultimap.create();
-               expectedPattern2.put("a1", events[0]);
-               expectedPattern2.put("a[]", events[1]);
-               expectedPattern2.put("a[]", events[2]);
-               expectedPattern2.put("a[]", events[3]);
-               expectedPattern2.put("a[]", events[4]);
-               expectedPattern2.put("b", events[5]);
-
-               ListMultimap<String, Event> expectedPattern3 = 
ArrayListMultimap.create();
-               expectedPattern3.put("a1", events[0]);
-               expectedPattern3.put("a[]", events[1]);
-               expectedPattern3.put("a[]", events[2]);
-               expectedPattern3.put("a[]", events[3]);
-               expectedPattern3.put("a[]", events[4]);
-               expectedPattern3.put("a[]", events[5]);
-               expectedPattern3.put("a[]", events[6]);
-               expectedPattern3.put("b", events[7]);
+               Map<String, List<Event>> expectedPattern1 = new HashMap<>();
+               expectedPattern1.put("a1", new ArrayList<Event>());
+               expectedPattern1.get("a1").add(events[2]);
+
+               expectedPattern1.put("a[]", new ArrayList<Event>());
+               expectedPattern1.get("a[]").add(events[3]);
+
+               expectedPattern1.put("b", new ArrayList<Event>());
+               expectedPattern1.get("b").add(events[5]);
+
+               Map<String, List<Event>> expectedPattern2 = new HashMap<>();
+               expectedPattern2.put("a1", new ArrayList<Event>());
+               expectedPattern2.get("a1").add(events[0]);
+
+               expectedPattern2.put("a[]", new ArrayList<Event>());
+               expectedPattern2.get("a[]").add(events[1]);
+               expectedPattern2.get("a[]").add(events[2]);
+               expectedPattern2.get("a[]").add(events[3]);
+               expectedPattern2.get("a[]").add(events[4]);
+
+               expectedPattern2.put("b", new ArrayList<Event>());
+               expectedPattern2.get("b").add(events[5]);
+
+               Map<String, List<Event>> expectedPattern3 = new HashMap<>();
+               expectedPattern3.put("a1", new ArrayList<Event>());
+               expectedPattern3.get("a1").add(events[0]);
+
+               expectedPattern3.put("a[]", new ArrayList<Event>());
+               expectedPattern3.get("a[]").add(events[1]);
+               expectedPattern3.get("a[]").add(events[2]);
+               expectedPattern3.get("a[]").add(events[3]);
+               expectedPattern3.get("a[]").add(events[4]);
+               expectedPattern3.get("a[]").add(events[5]);
+               expectedPattern3.get("a[]").add(events[6]);
+
+               expectedPattern3.put("b", new ArrayList<Event>());
+               expectedPattern3.get("b").add(events[7]);
 
                sharedBuffer.put("a1", events[0], timestamp, null, null, 0, 0, 
DeweyNumber.fromString("1"));
                sharedBuffer.put("a[]", events[1], timestamp, "a1", events[0], 
timestamp, 0, DeweyNumber.fromString("1.0"));
@@ -90,12 +107,12 @@ public class SharedBufferTest extends TestLogger {
                sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], 
timestamp, 5, DeweyNumber.fromString("1.1"));
                sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], 
timestamp, 6, DeweyNumber.fromString("1.1.0"));
 
-               Collection<ListMultimap<String, Event>> patterns3 = 
sharedBuffer.extractPatterns("b", events[7], timestamp, 7, 
DeweyNumber.fromString("1.1.0"));
+               Collection<Map<String, List<Event>>> patterns3 = 
sharedBuffer.extractPatterns("b", events[7], timestamp, 7, 
DeweyNumber.fromString("1.1.0"));
                sharedBuffer.release("b", events[7], timestamp, 7);
-               Collection<ListMultimap<String, Event>> patterns4 = 
sharedBuffer.extractPatterns("b", events[7], timestamp, 7, 
DeweyNumber.fromString("1.1.0"));
+               Collection<Map<String, List<Event>>> patterns4 = 
sharedBuffer.extractPatterns("b", events[7], timestamp, 7, 
DeweyNumber.fromString("1.1.0"));
 
-               Collection<ListMultimap<String, Event>> patterns1 = 
sharedBuffer.extractPatterns("b", events[5], timestamp, 2, 
DeweyNumber.fromString("2.0.0"));
-               Collection<ListMultimap<String, Event>> patterns2 = 
sharedBuffer.extractPatterns("b", events[5], timestamp, 5, 
DeweyNumber.fromString("1.0.0"));
+               Collection<Map<String, List<Event>>> patterns1 = 
sharedBuffer.extractPatterns("b", events[5], timestamp, 2, 
DeweyNumber.fromString("2.0.0"));
+               Collection<Map<String, List<Event>>> patterns2 = 
sharedBuffer.extractPatterns("b", events[5], timestamp, 5, 
DeweyNumber.fromString("1.0.0"));
                sharedBuffer.release("b", events[5], timestamp, 2);
                sharedBuffer.release("b", events[5], timestamp, 5);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index cd12071..6d4329a 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -80,7 +80,7 @@ public class NFACompilerTest extends TestLogger {
 
                // adjust the rule
                expectedException.expect(MalformedPatternException.class);
-               expectedException.expectMessage("Duplicate pattern name: start. 
Pattern names must be unique.");
+               expectedException.expectMessage("Duplicate pattern name: start. 
Names must be unique.");
 
                Pattern<Event, ?> invalidPattern = 
Pattern.<Event>begin("start").where(new TestFilter())
                        .followedBy("middle").where(new TestFilter())

Reply via email to