[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16100036#comment-16100036
 ] 

ASF GitHub Bot commented on FLINK-7169:
---------------------------------------

Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r129303049
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition<T> 
edge) {
                                                                nextVersion,
                                                                startTimestamp);
                                        }
    +
    +                                   switch (skipStrategy.getStrategy()) {
    +                                           case SKIP_PAST_LAST_ROW:
    +                                                   if 
(nextState.isFinal()) {
    +                                                           
resultingComputationStates.add(createStartComputationState(computationState, 
outgoingEdges));
    +                                                   }
    +                                                   break;
    +                                           case SKIP_TO_FIRST:
    +                                                   if 
(nextState.getName().equals(skipStrategy.getRpv()) &&
    +                                                           
!nextState.getName().equals(currentState.getName())) {
    +                                                           
ComputationState<T> startComputationState = 
createStartComputationState(computationState, outgoingEdges);
    --- End diff --
    
    The `outgoingEdges` parameter in this case will not work. Previously the 
assumption was that we created the new `start` state always in the starting 
state. So we could calculate the version for new run based on outgoing edges 
from the start state. 
    
    In this case the outgoingEdges do not correspond to the starting state.


> Support AFTER MATCH SKIP function in CEP library API
> ----------------------------------------------------
>
>                 Key: FLINK-7169
>                 URL: https://issues.apache.org/jira/browse/FLINK-7169
>             Project: Flink
>          Issue Type: Sub-task
>          Components: CEP
>            Reporter: Yueting Chen
>            Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to