[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101358#comment-16101358
 ] 

ASF GitHub Bot commented on FLINK-7169:
---------------------------------------

Github user yestinchen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r129515022
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition<T> 
edge) {
                                                                nextVersion,
                                                                startTimestamp);
                                        }
    +
    +                                   switch (skipStrategy.getStrategy()) {
    +                                           case SKIP_PAST_LAST_ROW:
    +                                                   if 
(nextState.isFinal()) {
    +                                                           
resultingComputationStates.add(createStartComputationState(computationState, 
outgoingEdges));
    +                                                   }
    +                                                   break;
    +                                           case SKIP_TO_FIRST:
    +                                                   if 
(nextState.getName().equals(skipStrategy.getRpv()) &&
    +                                                           
!nextState.getName().equals(currentState.getName())) {
    +                                                           
ComputationState<T> startComputationState = 
createStartComputationState(computationState, outgoingEdges);
    +                                                           if (callLevel > 
0) {
    +                                                                   throw 
new RuntimeException("infinite loop! Will abort the match process, please 
rewrite your pattern query");
    +                                                           }
    +                                                           // feed current 
matched event to the state.
    +                                                           
Collection<ComputationState<T>> computationStates = 
computeNextStates(startComputationState, event, timestamp, callLevel++);
    --- End diff --
    
    Because SKIP_TO_FIRST or SKIP_TO_LAST needs to start the next match process 
at the first or last matched event in specified pattern. For example, for a 
given event stream: `a1, b1, c1, a2` and a given match `(A B C)`. If we set the 
SkipStrategy to SKIP_TO_FIRST with a pattern name `B`, we should create a new 
`startComputationState` after `b1` is being processed. And the next match 
should start at event `b1`. So we need to manually feed `b1` to the newly 
created `startComputationState`. 


> Support AFTER MATCH SKIP function in CEP library API
> ----------------------------------------------------
>
>                 Key: FLINK-7169
>                 URL: https://issues.apache.org/jira/browse/FLINK-7169
>             Project: Flink
>          Issue Type: Sub-task
>          Components: CEP
>            Reporter: Yueting Chen
>            Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to