[
https://issues.apache.org/jira/browse/FLINK-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946746#comment-15946746
]
ASF GitHub Bot commented on FLINK-6165:
---------------------------------------
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/3621#discussion_r108621295
--- Diff:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
---
@@ -316,49 +341,45 @@ private void convertToSingletonState(final State<T>
sourceState, final State<T>
}
/**
- * Converts the given state into looping one. Looping state is
one with TAKE edge to itself and
+ * Creates the given state as a looping one. Looping state is
one with TAKE edge to itself and
* PROCEED edge to the sinkState. It also consists of a similar
state without the PROCEED edge, so that
* for each PROCEED transition branches in computation state
graph can be created only once.
*
- * <p>If this looping state is first of a graph we should treat
the {@link Pattern} as {@link FollowedByPattern}
- * to enable combinations.
- *
- * @param sourceState the state to converted
- * @param sinkState the state that the converted state
should point to
- * @param isFirstState if the looping state is first of a graph
+ * @param sinkState the state that the converted state should
point to
+ * @return the first state of the created complex state
*/
@SuppressWarnings("unchecked")
- private void convertToLooping(final State<T> sourceState, final
State<T> sinkState, boolean isFirstState) {
+ private State<T> createLooping(final State<T> sinkState) {
+ final State<T> loopingState = createNormalState();
final IterativeCondition<T> filterFunction =
(IterativeCondition<T>) currentPattern.getCondition();
- final IterativeCondition<T> trueFunction =
BooleanConditions.<T>trueFunction();
+ final IterativeCondition<T> trueFunction =
BooleanConditions.trueFunction();
- sourceState.addProceed(sinkState, trueFunction);
- sourceState.addTake(filterFunction);
- if (currentPattern instanceof FollowedByPattern ||
isFirstState) {
- final State<T> ignoreState = new State<>(
- currentPattern.getName(),
- State.StateType.Normal);
+ loopingState.addProceed(sinkState, trueFunction);
+ loopingState.addTake(filterFunction);
+ if
(!currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT)) {
+ final State<T> ignoreState =
createNormalState();
final IterativeCondition<T> ignoreCondition =
getIgnoreCondition(currentPattern);
- sourceState.addIgnore(ignoreState,
ignoreCondition);
- ignoreState.addTake(sourceState,
filterFunction);
+ ignoreState.addTake(loopingState,
filterFunction);
ignoreState.addIgnore(ignoreState,
ignoreCondition);
--- End diff --
here you could remove the first argument and use the `void addIgnore(final
FilterFunction<T> condition)` right?
> Implement internal continuity for looping states.
> -------------------------------------------------
>
> Key: FLINK-6165
> URL: https://issues.apache.org/jira/browse/FLINK-6165
> Project: Flink
> Issue Type: New Feature
> Components: CEP
> Reporter: Dawid Wysakowicz
> Assignee: Dawid Wysakowicz
>
> We should be able to specify an internal continuity for a looping state. The
> API could look like: {{zeroOrMore().consecutive()}}. So that we have a
> continuity up to the first element of a loop and between elements in the loop.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)