[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16126650#comment-16126650
 ] 

ASF GitHub Bot commented on FLINK-7169:
---------------------------------------

Github user yestinchen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r133091608
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -340,6 +362,65 @@ public void resetNFAChanged() {
                return Tuple2.of(result, timeoutResult);
        }
     
    +   private void 
discardComputationStatesAccordingToStrategy(Queue<ComputationState<T>> 
computationStates,
    +           Collection<Map<String, List<T>>> matchedResult, 
AfterMatchSkipStrategy afterMatchSkipStrategy) {
    +           Set<T> discardEvents = new HashSet<>();
    +           switch(afterMatchSkipStrategy.getStrategy()) {
    +                   case SKIP_TO_LAST:
    +                           for (Map<String, List<T>> resultMap: 
matchedResult) {
    +                                   for (Map.Entry<String, List<T>> 
keyMatches : resultMap.entrySet()) {
    +                                           if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
    +                                                   
discardEvents.addAll(keyMatches.getValue().subList(0, 
keyMatches.getValue().size() - 1));
    +                                                   break;
    +                                           } else {
    +                                                   
discardEvents.addAll(keyMatches.getValue());
    +                                           }
    +                                   }
    +                           }
    +                           break;
    +                   case SKIP_TO_FIRST:
    +                           for (Map<String, List<T>> resultMap: 
matchedResult) {
    +                                   for (Map.Entry<String, List<T>> 
keyMatches : resultMap.entrySet()) {
    +                                           if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
    +                                                   break;
    +                                           } else {
    +                                                   
discardEvents.addAll(keyMatches.getValue());
    +                                           }
    +                                   }
    +                           }
    +                           break;
    +                   case SKIP_PAST_LAST_EVENT:
    +                           for (Map<String, List<T>> resultMap: 
matchedResult) {
    +                                   for (List<T> eventList: 
resultMap.values()) {
    +                                           discardEvents.addAll(eventList);
    +                                   }
    +                           }
    +                           break;
    +           }
    +           if (!discardEvents.isEmpty()) {
    +                   List<ComputationState<T>> discardStates = new 
ArrayList<>();
    +                   for (ComputationState<T> computationState : 
computationStates) {
    +                           Map<String, List<T>> partialMatch = 
extractCurrentMatches(computationState);
    +                           for (List<T> list: partialMatch.values()) {
    +                                   for (T e: list) {
    +                                           if (discardEvents.contains(e)) {
    +                                                   // discard the 
computation state.
    +                                                   
eventSharedBuffer.release(
    +                                                           
NFAStateNameHandler.getOriginalNameFromInternal(
    +                                                                   
computationState.getState().getName()),
    +                                                           
computationState.getEvent(),
    +                                                           
computationState.getTimestamp(),
    +                                                           
computationState.getCounter()
    +                                                   );
    +                                                   
discardStates.add(computationState);
    --- End diff --
    
    Yes, you are right. Thanks for pointing it out!


> Support AFTER MATCH SKIP function in CEP library API
> ----------------------------------------------------
>
>                 Key: FLINK-7169
>                 URL: https://issues.apache.org/jira/browse/FLINK-7169
>             Project: Flink
>          Issue Type: Sub-task
>          Components: CEP
>            Reporter: Yueting Chen
>            Assignee: Yueting Chen
>             Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to