flink git commit: [FLINK-6772] [cep] Fix ordering (by timestamp) of matched events.

2017-06-08 Thread kkloudas
Repository: flink
Updated Branches:
  refs/heads/release-1.3 b2d6dc1d4 -> d24515fee


[FLINK-6772] [cep] Fix ordering (by timestamp) of matched events.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d24515fe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d24515fe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d24515fe

Branch: refs/heads/release-1.3
Commit: d24515fee29fc8924359ab6493e7c8e05ac9b173
Parents: b2d6dc1
Author: kkloudas 
Authored: Thu Jun 8 16:24:35 2017 +0200
Committer: kkloudas 
Committed: Thu Jun 8 16:24:35 2017 +0200

--
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 138 ++-
 .../org/apache/flink/cep/nfa/SharedBuffer.java  |  49 ---
 .../flink/cep/nfa/compiler/NFACompiler.java |  74 +++---
 .../cep/nfa/compiler/NFAStateNameHandler.java   |  79 +++
 .../org/apache/flink/cep/nfa/NFAITCase.java | 110 +++
 .../apache/flink/cep/nfa/SharedBufferTest.java  |  79 +++
 .../flink/cep/nfa/compiler/NFACompilerTest.java |   2 +-
 7 files changed, 334 insertions(+), 197 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/d24515fe/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 2be09ad..cac1601 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.cep.nfa;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.ListMultimap;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
@@ -29,13 +26,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.EnumSerializer;
-import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.cep.NonDuplicatingTypeSerializer;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.nfa.compiler.NFAStateNameHandler;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -44,7 +42,11 @@ import 
org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.util.Preconditions;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+
 import javax.annotation.Nullable;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -68,27 +70,26 @@ import java.util.Stack;
 
 /**
  * Non-deterministic finite automaton implementation.
- * 
- * The {@link org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator 
CEP operator}
+ *
+ * The {@link org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator 
CEP operator}
  * keeps one NFA per key, for keyed input streams, and a single global NFA for 
non-keyed ones.
  * When an event gets processed, it updates the NFA's internal state machine.
- * 
- * An event that belongs to a partially matched sequence is kept in an internal
+ *
+ * An event that belongs to a partially matched sequence is kept in an 
internal
  * {@link SharedBuffer buffer}, which is a memory-optimized data-structure 
exactly for
  * this purpose. Events in the buffer are removed when all the matched 
sequences that
  * contain them are:
  * 
- * emitted (success)
- * discarded (patterns containing NOT)
- * timed-out (windowed patterns)
+ *  emitted (success)
+ *  discarded (patterns containing NOT)
+ *  timed-out (windowed patterns)
  * 
  *
- * The implementation is strongly based on the paper "Efficient Pattern 
Matching over Event Streams".
- *
- * @see https://people.cs.umass.edu/~yanlei/publications/sase-

flink git commit: [FLINK-6772] [cep] Fix ordering (by timestamp) of matched events.

2017-06-08 Thread kkloudas
Repository: flink
Updated Branches:
  refs/heads/master bcaf816dc -> 5d3506e88


[FLINK-6772] [cep] Fix ordering (by timestamp) of matched events.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d3506e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d3506e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d3506e8

Branch: refs/heads/master
Commit: 5d3506e88f24ec0d1c2272f04570c745d319329b
Parents: bcaf816
Author: kkloudas 
Authored: Wed May 31 17:48:34 2017 +0200
Committer: kkloudas 
Committed: Thu Jun 8 15:41:38 2017 +0200

--
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 88 +++-
 .../org/apache/flink/cep/nfa/SharedBuffer.java  | 26 +++---
 .../flink/cep/nfa/compiler/NFACompiler.java | 45 ++
 .../cep/nfa/compiler/NFAStateNameHandler.java   | 79 ++
 .../org/apache/flink/cep/nfa/NFAITCase.java | 64 ++
 .../apache/flink/cep/nfa/SharedBufferTest.java  | 73 +---
 .../flink/cep/nfa/compiler/NFACompilerTest.java |  2 +-
 7 files changed, 255 insertions(+), 122 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index f438915..cac1601 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.cep.NonDuplicatingTypeSerializer;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.nfa.compiler.NFAStateNameHandler;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -43,7 +44,6 @@ import org.apache.flink.util.Preconditions;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
-import com.google.common.collect.ListMultimap;
 
 import javax.annotation.Nullable;
 
@@ -231,7 +231,7 @@ public class NFA implements Serializable {
}
 
eventSharedBuffer.release(
-   
computationState.getPreviousState().getName(),
+   
NFAStateNameHandler.getOriginalNameFromInternal(computationState.getPreviousState().getName()),
computationState.getEvent(),
computationState.getTimestamp(),
computationState.getCounter());
@@ -248,6 +248,7 @@ public class NFA implements Serializable {
//if stop state reached in this path
boolean shouldDiscardPath = false;
for (final ComputationState newComputationState: 
newComputationStates) {
+
if (newComputationState.isFinalState()) {
// we've reached a final state and can 
thus retrieve the matching event sequence
Map> matchedPattern = 
extractCurrentMatches(newComputationState);
@@ -255,7 +256,8 @@ public class NFA implements Serializable {
 
// remove found patterns because they 
are no longer needed
eventSharedBuffer.release(
-   
newComputationState.getPreviousState().getName(),
+   
NFAStateNameHandler.getOriginalNameFromInternal(
+   
newComputationState.getPreviousState().getName()),

newComputationState.getEvent(),

newComputationState.getTimestamp(),

computationState.getCounter());
@@ -263,10 +265,11 @@ public class NFA implements Serializable {
//reached stop state. release entry for 
the stop state
shouldDiscardPath = true;
eventSharedBuffer.release(
-   
newCo