[FLINK-6290] [CEP] Fix SharedBuffer release when having multiple edges between entries
This closes #3706 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0ea74a0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0ea74a0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0ea74a0 Branch: refs/heads/master Commit: c0ea74a0a4eedf4fe73e8f383e837ea373216476 Parents: 0506545 Author: Dawid Wysakowicz <[email protected]> Authored: Mon Apr 10 16:57:48 2017 +0200 Committer: Stephan Ewen <[email protected]> Committed: Fri Apr 21 12:22:00 2017 +0200 ---------------------------------------------------------------------- .../org/apache/flink/cep/nfa/SharedBuffer.java | 5 ++-- .../apache/flink/cep/nfa/SharedBufferTest.java | 28 ++++++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c0ea74a0/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java index ccc6884..43c2aca 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java @@ -274,7 +274,6 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { public void release(final K key, final V value, final long timestamp) { SharedBufferEntry<K, V> entry = get(key, value, timestamp); if (entry != null) { - entry.decreaseReferenceCounter(); internalRemove(entry); } } @@ -493,13 +492,13 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { while (!entriesToRemove.isEmpty()) { SharedBufferEntry<K, V> currentEntry = entriesToRemove.pop(); + currentEntry.decreaseReferenceCounter(); if (currentEntry.getReferenceCounter() == 0) { currentEntry.remove(); - for (SharedBufferEdge<K, V> edge: currentEntry.getEdges()) { + for (SharedBufferEdge<K, V> edge : currentEntry.getEdges()) { if (edge.getTarget() != null) { - edge.getTarget().decreaseReferenceCounter(); entriesToRemove.push(edge.getTarget()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c0ea74a0/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java index f0a25d2..adc07b3 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java @@ -32,6 +32,7 @@ import java.util.Collection; import java.util.Collections; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class SharedBufferTest extends TestLogger { @@ -138,4 +139,31 @@ public class SharedBufferTest extends TestLogger { assertEquals(sharedBuffer, copy); } + + @Test + public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() { + SharedBuffer<String, Event> sharedBuffer = new SharedBuffer<>(Event.createTypeSerializer()); + int numberEvents = 8; + Event[] events = new Event[numberEvents]; + final long timestamp = 1L; + + for (int i = 0; i < numberEvents; i++) { + events[i] = new Event(i + 1, "e" + (i + 1), i); + } + + sharedBuffer.put("start", events[1], timestamp, DeweyNumber.fromString("1")); + sharedBuffer.put("branching", events[2], timestamp, "start", events[1], timestamp, DeweyNumber.fromString("1.0")); + sharedBuffer.put("branching", events[3], timestamp, "start", events[1], timestamp, DeweyNumber.fromString("1.1")); + sharedBuffer.put("branching", events[3], timestamp, "branching", events[2], timestamp, DeweyNumber.fromString("1.0.0")); + sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp, DeweyNumber.fromString("1.0.0.0")); + sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp, DeweyNumber.fromString("1.1.0")); + + //simulate IGNORE (next event can point to events[2]) + sharedBuffer.lock("branching", events[2], timestamp); + + sharedBuffer.release("branching", events[4], timestamp); + + //There should be still events[1] and events[2] in the buffer + assertFalse(sharedBuffer.isEmpty()); + } }
