[
https://issues.apache.org/jira/browse/FLINK-3703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420531#comment-15420531
]
ASF GitHub Bot commented on FLINK-3703:
---------------------------------------
Github user smarthi commented on a diff in the pull request:
https://github.com/apache/flink/pull/2367#discussion_r74711517
--- Diff:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
---
@@ -256,16 +267,75 @@ public void prune(long pruningTimestamp) {
edge.getTarget(),
edge.getVersion(),
copy));
+ if
(matchingBehaviour == MatchingBehaviour.AFTER_LAST) {
+
cleanUp.add(edge.getTarget());
+ }
}
}
}
}
}
}
+ // Remove shared buffer entries to maintain correct matching
behaviour
+ doCleanUp(new Predicate<K, V>() {
+
+ @Override
+ public boolean toRemove(SharedBufferEntry<K, V> entry) {
+ return cleanUp.contains(entry);
+ }
+ });
+ // Remove all entries that are dependent on the current event
+ if (matchingBehaviour == MatchingBehaviour.AFTER_LAST) {
+ doCleanUp(new Predicate<K, V>() {
+
+ @Override
+ public boolean toRemove(SharedBufferEntry<K, V>
entry) {
+ if (entry == null) {
+ return false;
+ }
+ return entry.getValueTime().value ==
value
+ &&
entry.getValueTime().timestamp == timestamp;
+ }
+ });
+ }
+
return result;
}
+ private void doCleanUp(Predicate<K, V> predicate) {
+ ArrayList<SharedBufferEntry<K, V>> toRemove = new ArrayList<>();
+ for (SharedBufferPage<K, V> page : this.pages.values()) {
+ for (SharedBufferEntry<K, V> entry : page.getEntries())
{
+ if (entry.getReferenceCounter() <= 1) {
+ doRecursiveCleanup(entry, predicate,
toRemove);
+ }
+ }
+ }
+
+ for (SharedBufferEntry<K, V> startNode: toRemove) {
+ release(startNode.page.getKey(),
startNode.getValueTime().value, startNode.getValueTime().getTimestamp());
+ remove(startNode.page.getKey(),
startNode.getValueTime().value, startNode.getValueTime().getTimestamp());
+ }
+ }
+
+ private boolean doRecursiveCleanup(SharedBufferEntry<K, V> startNode,
Predicate<K, V> cleanUp, ArrayList<SharedBufferEntry<K, V>> toRemove) {
--- End diff --
Replace ArrayList by List in the arguments, unless we need it to be
ArrayList explicitly.
> Add sequence matching semantics to discard matched events
> ---------------------------------------------------------
>
> Key: FLINK-3703
> URL: https://issues.apache.org/jira/browse/FLINK-3703
> Project: Flink
> Issue Type: Improvement
> Components: CEP
> Affects Versions: 1.0.0, 1.1.0
> Reporter: Till Rohrmann
> Assignee: Ivan Mushketyk
> Priority: Minor
>
> There is no easy way to decide whether events can be part of multiple
> matching sequences or not. Currently, the default is that an event can
> participate in multiple matching sequences. E.g. if you have the pattern
> {{Pattern.<Event>begin("a").followedBy("b")}} and the input event stream
> {{Event("A"), Event("B"), Event("C")}}, then you will generate the following
> matching sequences: {{Event("A"), Event("B")}}, {{Event("A"), Event("C")}}
> and {{Event("B"), Event("C")}}.
> It would be useful to allow the user to define where the matching algorithm
> should continue after a matching sequence has been found. Possible option
> values could be
> * {{from first}} - continue keeping all events for future matches (that is
> the current behaviour)
> * {{after first}} - continue after the first element (remove first matching
> event and continue with the second event)
> * {{after last}} - continue after the last element (effectively discarding
> all elements of the matching sequence)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)