flink git commit: [FLINK-6772] [cep] Fix ordering (by timestamp) of matched events.
Repository: flink Updated Branches: refs/heads/release-1.3 b2d6dc1d4 -> d24515fee [FLINK-6772] [cep] Fix ordering (by timestamp) of matched events. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d24515fe Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d24515fe Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d24515fe Branch: refs/heads/release-1.3 Commit: d24515fee29fc8924359ab6493e7c8e05ac9b173 Parents: b2d6dc1 Author: kkloudas Authored: Thu Jun 8 16:24:35 2017 +0200 Committer: kkloudas Committed: Thu Jun 8 16:24:35 2017 +0200 -- .../main/java/org/apache/flink/cep/nfa/NFA.java | 138 ++- .../org/apache/flink/cep/nfa/SharedBuffer.java | 49 --- .../flink/cep/nfa/compiler/NFACompiler.java | 74 +++--- .../cep/nfa/compiler/NFAStateNameHandler.java | 79 +++ .../org/apache/flink/cep/nfa/NFAITCase.java | 110 +++ .../apache/flink/cep/nfa/SharedBufferTest.java | 79 +++ .../flink/cep/nfa/compiler/NFACompilerTest.java | 2 +- 7 files changed, 334 insertions(+), 197 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/d24515fe/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index 2be09ad..cac1601 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -18,9 +18,6 @@ package org.apache.flink.cep.nfa; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterators; -import com.google.common.collect.ListMultimap; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; @@ -29,13 +26,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.common.typeutils.base.EnumSerializer; -import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; import org.apache.flink.cep.NonDuplicatingTypeSerializer; import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.nfa.compiler.NFAStateNameHandler; import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -44,7 +42,11 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterators; + import javax.annotation.Nullable; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -68,27 +70,26 @@ import java.util.Stack; /** * Non-deterministic finite automaton implementation. - * - * The {@link org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator CEP operator} + * + * The {@link org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator CEP operator} * keeps one NFA per key, for keyed input streams, and a single global NFA for non-keyed ones. * When an event gets processed, it updates the NFA's internal state machine. - * - * An event that belongs to a partially matched sequence is kept in an internal + * + * An event that belongs to a partially matched sequence is kept in an internal * {@link SharedBuffer buffer}, which is a memory-optimized data-structure exactly for * this purpose. Events in the buffer are removed when all the matched sequences that * contain them are: * - * emitted (success) - * discarded (patterns containing NOT) - * timed-out (windowed patterns) + * emitted (success) + * discarded (patterns containing NOT) + * timed-out (windowed patterns) * * - * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". - * - * @see https://people.cs.umass.edu/~yanlei/publications/sase-
flink git commit: [FLINK-6772] [cep] Fix ordering (by timestamp) of matched events.
Repository: flink Updated Branches: refs/heads/master bcaf816dc -> 5d3506e88 [FLINK-6772] [cep] Fix ordering (by timestamp) of matched events. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d3506e8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d3506e8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d3506e8 Branch: refs/heads/master Commit: 5d3506e88f24ec0d1c2272f04570c745d319329b Parents: bcaf816 Author: kkloudas Authored: Wed May 31 17:48:34 2017 +0200 Committer: kkloudas Committed: Thu Jun 8 15:41:38 2017 +0200 -- .../main/java/org/apache/flink/cep/nfa/NFA.java | 88 +++- .../org/apache/flink/cep/nfa/SharedBuffer.java | 26 +++--- .../flink/cep/nfa/compiler/NFACompiler.java | 45 ++ .../cep/nfa/compiler/NFAStateNameHandler.java | 79 ++ .../org/apache/flink/cep/nfa/NFAITCase.java | 64 ++ .../apache/flink/cep/nfa/SharedBufferTest.java | 73 +--- .../flink/cep/nfa/compiler/NFACompilerTest.java | 2 +- 7 files changed, 255 insertions(+), 122 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index f438915..cac1601 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -33,6 +33,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; import org.apache.flink.cep.NonDuplicatingTypeSerializer; import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.nfa.compiler.NFAStateNameHandler; import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -43,7 +44,6 @@ import org.apache.flink.util.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Iterators; -import com.google.common.collect.ListMultimap; import javax.annotation.Nullable; @@ -231,7 +231,7 @@ public class NFA implements Serializable { } eventSharedBuffer.release( - computationState.getPreviousState().getName(), + NFAStateNameHandler.getOriginalNameFromInternal(computationState.getPreviousState().getName()), computationState.getEvent(), computationState.getTimestamp(), computationState.getCounter()); @@ -248,6 +248,7 @@ public class NFA implements Serializable { //if stop state reached in this path boolean shouldDiscardPath = false; for (final ComputationState newComputationState: newComputationStates) { + if (newComputationState.isFinalState()) { // we've reached a final state and can thus retrieve the matching event sequence Map> matchedPattern = extractCurrentMatches(newComputationState); @@ -255,7 +256,8 @@ public class NFA implements Serializable { // remove found patterns because they are no longer needed eventSharedBuffer.release( - newComputationState.getPreviousState().getName(), + NFAStateNameHandler.getOriginalNameFromInternal( + newComputationState.getPreviousState().getName()), newComputationState.getEvent(), newComputationState.getTimestamp(), computationState.getCounter()); @@ -263,10 +265,11 @@ public class NFA implements Serializable { //reached stop state. release entry for the stop state shouldDiscardPath = true; eventSharedBuffer.release( - newCo