[
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106769#comment-16106769
]
ASF GitHub Bot commented on FLINK-7169:
---------------------------------------
Github user dianfu commented on a diff in the pull request:
https://github.com/apache/flink/pull/4331#discussion_r130266394
--- Diff:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition<T>
edge) {
nextVersion,
startTimestamp);
}
+
+ switch (skipStrategy.getStrategy()) {
+ case SKIP_PAST_LAST_EVENT:
+ if
(nextState.isFinal()) {
+
resultingComputationStates.add(createStartComputationState(computationState,
event));
+ }
+ break;
+ case SKIP_TO_FIRST:
+ if
(nextState.getName().equals(skipStrategy.getPatternName()) &&
+
!nextState.getName().equals(currentState.getName())) {
+
ComputationState<T> startComputationState =
createStartComputationState(computationState, event);
+ if (callLevel >
0) {
+ throw
new RuntimeException("infinite loop! Will abort the match process, please
rewrite your pattern query");
+ }
+ // feed current
matched event to the state.
+
Collection<ComputationState<T>> computationStates =
computeNextStates(startComputationState, event, timestamp, callLevel++);
+
resultingComputationStates.addAll(computationStates);
+ } else if
(previousState == null &&
currentState.getName().equals(skipStrategy.getPatternName())) {
+ throw new
RuntimeException("infinite loop! Will abort the match process, please rewrite
your pattern query");
+ }
+ break;
+ case SKIP_TO_LAST:
+ if
(currentState.getName().equals(skipStrategy.getPatternName()) &&
+
!nextState.getName().equals(currentState.getName())) {
+
ComputationState<T> startComputationState =
createStartComputationState(computationState, event);
+ if (callLevel >
0) {
+ throw
new RuntimeException("infinite loop! Will abort the match process, please
rewrite your pattern query");
+ }
+ // feed current
matched event to the state.
+
Collection<ComputationState<T>> computationStates =
computeNextStates(startComputationState, event, timestamp, callLevel++);
+
resultingComputationStates.addAll(computationStates);
+ }
+ break;
+ }
break;
}
}
- if (computationState.isStartState()) {
- int totalBranches = calculateIncreasingSelfState(
- outgoingEdges.getTotalIgnoreBranches(),
- outgoingEdges.getTotalTakeBranches());
-
- DeweyNumber startVersion =
computationState.getVersion().increase(totalBranches);
- ComputationState<T> startState =
ComputationState.createStartState(this, computationState.getState(),
startVersion);
- resultingComputationStates.add(startState);
+ if (computationState.isStartState() &&
+ skipStrategy.getStrategy() ==
AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_NEXT_EVENT) {
--- End diff --
If the skip strategy is @**SKIP_PAST_LAST_EVENT**, for pattern **a nextby
b** and inputs **a1 c a2 b2**, the output should be **a2 b2**, but per the
current implementation, I'm afraid there will be no output as after **a1** is
processed, the start ComputationState will be consumed, but there is no chance
to add the start ComputationState later and so **a2 b2** can not be matched.
Right? For other skip strategy, I'm afraid there may be also such issues.
> Support AFTER MATCH SKIP function in CEP library API
> ----------------------------------------------------
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
> Issue Type: Sub-task
> Components: CEP
> Reporter: Yueting Chen
> Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?>
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy)
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's
> what CEP library behaves currently.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)