Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6171#discussion_r198473858
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -330,77 +328,85 @@ private boolean isStateTimedOut(final 
ComputationState state, final long timesta
                        }
                }
     
    -           discardComputationStatesAccordingToStrategy(
    -                   sharedBuffer, computationStates, result, 
afterMatchSkipStrategy);
    +           if (!potentialMatches.isEmpty()) {
    +                   nfaState.setStateChanged();
    +           }
    +
    +           List<Map<String, List<T>>> result = new ArrayList<>();
    +           if (afterMatchSkipStrategy.isSkipStrategy()) {
    +                   processMatchesAccordingToSkipStrategy(sharedBuffer,
    +                           nfaState,
    +                           afterMatchSkipStrategy,
    +                           potentialMatches,
    +                           result);
    +           } else {
    +                   for (ComputationState match : potentialMatches) {
    +                           
result.add(sharedBuffer.materializeMatch(sharedBuffer.extractPatterns(match.getPreviousBufferEntry(),
    +                                   match.getVersion()).get(0)));
    +                           
sharedBuffer.releaseNode(match.getPreviousBufferEntry());
    +                   }
    +           }
     
                return result;
        }
     
    -   private void discardComputationStatesAccordingToStrategy(
    -                   final SharedBuffer<T> sharedBuffer,
    -                   final Queue<ComputationState> computationStates,
    -                   final Collection<Map<String, List<T>>> matchedResult,
    -                   final AfterMatchSkipStrategy afterMatchSkipStrategy) 
throws Exception {
    +   private void processMatchesAccordingToSkipStrategy(
    +                   SharedBuffer<T> sharedBuffer,
    +                   NFAState nfaState,
    +                   AfterMatchSkipStrategy afterMatchSkipStrategy,
    +                   PriorityQueue<ComputationState> potentialMatches,
    +                   List<Map<String, List<T>>> result) throws Exception {
     
    -           Set<T> discardEvents = new HashSet<>();
    -           switch(afterMatchSkipStrategy.getStrategy()) {
    -                   case SKIP_TO_LAST:
    -                           for (Map<String, List<T>> resultMap: 
matchedResult) {
    -                                   for (Map.Entry<String, List<T>> 
keyMatches : resultMap.entrySet()) {
    -                                           if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
    -                                                   
discardEvents.addAll(keyMatches.getValue().subList(0, 
keyMatches.getValue().size() - 1));
    -                                                   break;
    -                                           } else {
    -                                                   
discardEvents.addAll(keyMatches.getValue());
    -                                           }
    -                                   }
    -                           }
    -                           break;
    -                   case SKIP_TO_FIRST:
    -                           for (Map<String, List<T>> resultMap: 
matchedResult) {
    -                                   for (Map.Entry<String, List<T>> 
keyMatches : resultMap.entrySet()) {
    -                                           if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
    -                                                   break;
    -                                           } else {
    -                                                   
discardEvents.addAll(keyMatches.getValue());
    -                                           }
    -                                   }
    -                           }
    -                           break;
    -                   case SKIP_PAST_LAST_EVENT:
    -                           for (Map<String, List<T>> resultMap: 
matchedResult) {
    -                                   for (List<T> eventList: 
resultMap.values()) {
    -                                           discardEvents.addAll(eventList);
    -                                   }
    -                           }
    -                           break;
    -           }
    -           if (!discardEvents.isEmpty()) {
    -                   List<ComputationState> discardStates = new 
ArrayList<>();
    -                   for (ComputationState computationState : 
computationStates) {
    -                           boolean discard = false;
    -                           Map<String, List<T>> partialMatch = 
extractCurrentMatches(sharedBuffer, computationState);
    -                           for (List<T> list: partialMatch.values()) {
    -                                   for (T e: list) {
    -                                           if (discardEvents.contains(e)) {
    -                                                   // discard the 
computation state.
    -                                                   discard = true;
    -                                                   break;
    -                                           }
    -                                   }
    -                                   if (discard) {
    -                                           break;
    -                                   }
    -                           }
    -                           if (discard) {
    -                                   
sharedBuffer.releaseNode(computationState.getPreviousBufferEntry());
    -                                   discardStates.add(computationState);
    -                           }
    +           nfaState.getCompletedMatches().addAll(potentialMatches);
    +
    +           ComputationState earliestMatch = 
nfaState.getCompletedMatches().peek();
    +
    +           if (earliestMatch != null) {
    +                   Queue<ComputationState> sortedPartialMatches = 
sortByStartTime(nfaState.getPartialMatches());
    --- End diff --
    
    Instead of sorting every time, why not keeping the partial matches in a 
priority queue?


---

Reply via email to