[FLINK-6290] [CEP] Fix SharedBuffer release when having multiple edges between 
entries

This closes #3706


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0ea74a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0ea74a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0ea74a0

Branch: refs/heads/master
Commit: c0ea74a0a4eedf4fe73e8f383e837ea373216476
Parents: 0506545
Author: Dawid Wysakowicz <[email protected]>
Authored: Mon Apr 10 16:57:48 2017 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Fri Apr 21 12:22:00 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/cep/nfa/SharedBuffer.java  |  5 ++--
 .../apache/flink/cep/nfa/SharedBufferTest.java  | 28 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c0ea74a0/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index ccc6884..43c2aca 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -274,7 +274,6 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
        public void release(final K key, final V value, final long timestamp) {
                SharedBufferEntry<K, V> entry = get(key, value, timestamp);
                if (entry != null) {
-                       entry.decreaseReferenceCounter();
                        internalRemove(entry);
                }
        }
@@ -493,13 +492,13 @@ public class SharedBuffer<K extends Serializable, V> 
implements Serializable {
 
                while (!entriesToRemove.isEmpty()) {
                        SharedBufferEntry<K, V> currentEntry = 
entriesToRemove.pop();
+                       currentEntry.decreaseReferenceCounter();
 
                        if (currentEntry.getReferenceCounter() == 0) {
                                currentEntry.remove();
 
-                               for (SharedBufferEdge<K, V> edge: 
currentEntry.getEdges()) {
+                               for (SharedBufferEdge<K, V> edge : 
currentEntry.getEdges()) {
                                        if (edge.getTarget() != null) {
-                                               
edge.getTarget().decreaseReferenceCounter();
                                                
entriesToRemove.push(edge.getTarget());
                                        }
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/c0ea74a0/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
index f0a25d2..adc07b3 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
@@ -32,6 +32,7 @@ import java.util.Collection;
 import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class SharedBufferTest extends TestLogger {
@@ -138,4 +139,31 @@ public class SharedBufferTest extends TestLogger {
 
                assertEquals(sharedBuffer, copy);
        }
+
+       @Test
+       public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() {
+               SharedBuffer<String, Event> sharedBuffer = new 
SharedBuffer<>(Event.createTypeSerializer());
+               int numberEvents = 8;
+               Event[] events = new Event[numberEvents];
+               final long timestamp = 1L;
+
+               for (int i = 0; i < numberEvents; i++) {
+                       events[i] = new Event(i + 1, "e" + (i + 1), i);
+               }
+
+               sharedBuffer.put("start", events[1], timestamp, 
DeweyNumber.fromString("1"));
+               sharedBuffer.put("branching", events[2], timestamp, "start", 
events[1], timestamp, DeweyNumber.fromString("1.0"));
+               sharedBuffer.put("branching", events[3], timestamp, "start", 
events[1], timestamp, DeweyNumber.fromString("1.1"));
+               sharedBuffer.put("branching", events[3], timestamp, 
"branching", events[2], timestamp, DeweyNumber.fromString("1.0.0"));
+               sharedBuffer.put("branching", events[4], timestamp, 
"branching", events[3], timestamp, DeweyNumber.fromString("1.0.0.0"));
+               sharedBuffer.put("branching", events[4], timestamp, 
"branching", events[3], timestamp, DeweyNumber.fromString("1.1.0"));
+
+               //simulate IGNORE (next event can point to events[2])
+               sharedBuffer.lock("branching", events[2], timestamp);
+
+               sharedBuffer.release("branching", events[4], timestamp);
+
+               //There should be still events[1] and events[2] in the buffer
+               assertFalse(sharedBuffer.isEmpty());
+       }
 }

Reply via email to