[ 
https://issues.apache.org/jira/browse/FLINK-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946746#comment-15946746
 ] 

ASF GitHub Bot commented on FLINK-6165:
---------------------------------------

Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3621#discussion_r108621295
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
    @@ -316,49 +341,45 @@ private void convertToSingletonState(final State<T> 
sourceState, final State<T>
                }
     
                /**
    -            * Converts the given state into looping one. Looping state is 
one with TAKE edge to itself and
    +            * Creates the given state as a looping one. Looping state is 
one with TAKE edge to itself and
                 * PROCEED edge to the sinkState. It also consists of a similar 
state without the PROCEED edge, so that
                 * for each PROCEED transition branches in computation state 
graph  can be created only once.
                 *
    -            * <p>If this looping state is first of a graph we should treat 
the {@link Pattern} as {@link FollowedByPattern}
    -            * to enable combinations.
    -            *
    -            * @param sourceState  the state to converted
    -            * @param sinkState    the state that the converted state 
should point to
    -            * @param isFirstState if the looping state is first of a graph
    +            * @param sinkState the state that the converted state should 
point to
    +            * @return the first state of the created complex state
                 */
                @SuppressWarnings("unchecked")
    -           private void convertToLooping(final State<T> sourceState, final 
State<T> sinkState, boolean isFirstState) {
    +           private State<T> createLooping(final State<T> sinkState) {
     
    +                   final State<T> loopingState = createNormalState();
                        final IterativeCondition<T> filterFunction = 
(IterativeCondition<T>) currentPattern.getCondition();
    -                   final IterativeCondition<T> trueFunction = 
BooleanConditions.<T>trueFunction();
    +                   final IterativeCondition<T> trueFunction = 
BooleanConditions.trueFunction();
     
    -                   sourceState.addProceed(sinkState, trueFunction);
    -                   sourceState.addTake(filterFunction);
    -                   if (currentPattern instanceof FollowedByPattern || 
isFirstState) {
    -                           final State<T> ignoreState = new State<>(
    -                                   currentPattern.getName(),
    -                                   State.StateType.Normal);
    +                   loopingState.addProceed(sinkState, trueFunction);
    +                   loopingState.addTake(filterFunction);
    +                   if 
(!currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT)) {
    +                           final State<T> ignoreState = 
createNormalState();
     
                                final IterativeCondition<T> ignoreCondition = 
getIgnoreCondition(currentPattern);
     
    -                           sourceState.addIgnore(ignoreState, 
ignoreCondition);
    -                           ignoreState.addTake(sourceState, 
filterFunction);
    +                           ignoreState.addTake(loopingState, 
filterFunction);
                                ignoreState.addIgnore(ignoreState, 
ignoreCondition);
    --- End diff --
    
    here you could remove the first argument and use the `void addIgnore(final 
FilterFunction<T> condition)` right?


> Implement internal continuity for looping states.
> -------------------------------------------------
>
>                 Key: FLINK-6165
>                 URL: https://issues.apache.org/jira/browse/FLINK-6165
>             Project: Flink
>          Issue Type: New Feature
>          Components: CEP
>            Reporter: Dawid Wysakowicz
>            Assignee: Dawid Wysakowicz
>
> We should be able to specify an internal continuity for a looping state. The 
> API could look like: {{zeroOrMore().consecutive()}}. So that we have a 
> continuity up to the first element of a loop and between elements in the loop.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to