[GitHub] [flink] dianfu commented on a diff in pull request #20029: [FLINK-27392][cep] CEP Pattern supports definition of the maximum time gap between events
dianfu commented on code in PR #20029: URL: https://github.com/apache/flink/pull/20029#discussion_r907935649 ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java: ## @@ -187,15 +190,17 @@ public enum ConsumingStrategy { public static class Times { private final int from; private final int to; +private final Time windowTime; Review Comment: ```suggestion private final @Nullable Time windowTime; ``` This is missing~ ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java: ## @@ -192,6 +204,22 @@ long getWindowTime() { return windowTime.orElse(0L); } +Map getWindowTimes() { +return windowTimes; +} + +/** Check pattern window times between events. */ +private void checkPatternWindowTimes() { Review Comment: Add a test case for this check? ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java: ## @@ -22,12 +22,20 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.cep.nfa.sharedbuffer.EventId; import org.apache.flink.cep.nfa.sharedbuffer.NodeId; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; /** Snapshot class for {@link NFAStateSerializer}. */ public class NFAStateSerializerSnapshot extends CompositeTypeSerializerSnapshot { -private static final int CURRENT_VERSION = 1; +private static final int CURRENT_VERSION = 4; + +private static final int FIRST_VERSION_WITH_PREVIOUS_TIMESTAMP = 3; Review Comment: I guess it should be equal to CURRENT_VERSION at present. ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java: ## @@ -164,6 +172,8 @@ void compileFactory() { checkPatternSkipStrategy(); +checkPatternWindowTimes(); Review Comment: I guess this check should be moved after createStartState as windowTimes and windowTime are empty here. ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java: ## @@ -22,12 +22,20 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.cep.nfa.sharedbuffer.EventId; import org.apache.flink.cep.nfa.sharedbuffer.NodeId; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; /** Snapshot class for {@link NFAStateSerializer}. */ public class NFAStateSerializerSnapshot extends CompositeTypeSerializerSnapshot { -private static final int CURRENT_VERSION = 1; +private static final int CURRENT_VERSION = 4; Review Comment: Why change it directly from 1 to 4 instead of 2? ## flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java: ## @@ -37,6 +41,14 @@ /** Tests for {@link Pattern#timesOrMore(int)}. */ public class TimesOrMoreITCase extends TestLogger { + +@Parameterized.Parameter public Time time; + +@Parameterized.Parameters(name = "Times Range Time: {0}") Review Comment: Are there test cases covering the window timed out occurred in Times? ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java: ## @@ -273,7 +282,21 @@ public Collection>> process( new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR); for (ComputationState computationState : nfaState.getPartialMatches()) { -if (isStateTimedOut(computationState, timestamp)) { +String currentStateName = computationState.getCurrentStateName(); +boolean isCurrentStateTimeout = +windowTimes.containsKey(currentStateName) +&& isStateTimedOut( +computationState, +timestamp, +computationState.getPreviousTimestamp(), +windowTimes.get(currentStateName)); +boolean isLastStateTimeout = Review Comment: The names `isCurrentStateTimeout ` and `isLastStateTimeout ` are still confusing. ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java: ## @@ -192,6 +204,22 @@ long getWindowTime() { return windowTime.orElse(0L); } +Map getWindowTimes() { +return windowTimes; +} + +/** Check pattern window times between events. */ +private void checkPatternWindowTimes() { +if (windowTimes.values().stream() Review Comment: What about refactoring it as following? ``` windowTime.ifPresent(
[GitHub] [flink] dianfu commented on a diff in pull request #20029: [FLINK-27392][cep] CEP Pattern supports definition of the maximum time gap between events
dianfu commented on code in PR #20029: URL: https://github.com/apache/flink/pull/20029#discussion_r906937708 ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java: ## @@ -579,6 +662,21 @@ private void checkIfPreviousPatternGreedy() { } } +private void checkWindowTimeBetweenEvents(Time windowTime, WithinType withinType) { Review Comment: What about moving this validation to NFAFactoryCompilter.compileFactory? The window time of `FIRST_AND_LAST` may be defined in other Pattern object which is only available during compiling the Pattern. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dianfu commented on a diff in pull request #20029: [FLINK-27392][cep] CEP Pattern supports definition of the maximum time gap between events
dianfu commented on code in PR #20029: URL: https://github.com/apache/flink/pull/20029#discussion_r906937708 ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java: ## @@ -579,6 +662,21 @@ private void checkIfPreviousPatternGreedy() { } } +private void checkWindowTimeBetweenEvents(Time windowTime, WithinType withinType) { Review Comment: What about moving this validation to NFAFactoryCompilter.compileFactory? It window time of `FIRST_AND_LAST` may be defined in other Pattern object which is only available during compiling the Pattern. ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java: ## @@ -394,14 +447,28 @@ public Pattern times(int times) { * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ public Pattern times(int from, int to) { +return times(from, to, null); +} + +/** + * Specifies that the pattern can occur between from and to times with time interval corresponds + * to the maximum time gap between previous and current event for each times. + * + * @param from number of times matching event must appear at least + * @param to number of times matching event must appear at most + * @param windowTime time of the matching window between times + * @return The same pattern with the number of times range applied + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. + */ +public Pattern times(int from, int to, Time windowTime) { Review Comment: ```suggestion public Pattern times(int from, int to, @Nullable Time windowTime) { ``` ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java: ## @@ -206,12 +209,16 @@ public int getTo() { return to; } -public static Times of(int from, int to) { -return new Times(from, to); +public Time getWindowTime() { +return windowTime; } -public static Times of(int times) { -return new Times(times, times); +public static Times of(int from, int to, Time windowTime) { Review Comment: ```suggestion public static Times of(int from, int to, @Nullable Time windowTime) { ``` ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java: ## @@ -137,6 +144,8 @@ public static boolean canProduceEmptyMatches(final Pattern pattern) { private final NFAStateNameHandler stateNameHandler = new NFAStateNameHandler(); private final Map> stopStates = new HashMap<>(); private final List> states = new ArrayList<>(); +private final Map ignoreTimes = new HashMap<>(); Review Comment: ignoreTimes is not used ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/MigrationUtils.java: ## @@ -112,6 +112,7 @@ static Queue deserializeComputationStates( long timestamp = timestampSerializer.deserialize(source); DeweyNumber version = versionSerializer.deserialize(source); long startTimestamp = timestampSerializer.deserialize(source); +long previousTimestamp = timestampSerializer.deserialize(source); Review Comment: ```suggestion long previousTimestamp = -1; ``` This is to allow migrating state from checkpoint/savepoint generated prior to Flink 1.5 and so I guess we could not just deserialize from it. ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java: ## @@ -41,6 +41,9 @@ public class ComputationState { // Timestamp of the first element in the pattern private final long startTimestamp; +// Timestamp of the previous element in the state Review Comment: ```suggestion // Timestamp of the previous element of the pattern ``` ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java: ## @@ -414,10 +481,26 @@ public Pattern times(int from, int to) { * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ public Pattern timesOrMore(int times) { +return timesOrMore(times, null); +} + +/** + * Specifies that this pattern can occur the specified times at least with interval corresponds + * to the maximum time gap between previous and current event for each times. This means at + * least the specified times and at most infinite number of events can be matched to this + * pattern. + * + * @param times number of times at least matching event must appear + * @param windowTime time of the matching window between times + * @return The same pattern with a {@link Quantifier#looping(ConsumingStrategy)} quantifier + * applie
[GitHub] [flink] dianfu commented on a diff in pull request #20029: [FLINK-27392][cep] CEP Pattern supports definition of the maximum time gap between events
dianfu commented on code in PR #20029: URL: https://github.com/apache/flink/pull/20029#discussion_r904446329 ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java: ## @@ -579,6 +662,16 @@ private void checkIfPreviousPatternGreedy() { } } +private void checkWindowTimeBetweenEvents(Time windowTime, WithinType withinType) { +if (WithinType.PREVIOUS_AND_CURRENT.equals(withinType) +&& windowTimes.containsKey(WithinType.FIRST_AND_LAST) +&& windowTime.toMilliseconds() +> windowTimes.get(WithinType.FIRST_AND_LAST).toMilliseconds()) { +throw new MalformedPatternException( +"Window length between the previous and current event cannot be larger than which between the first and last event for pattern."); Review Comment: ```suggestion "Window length between the previous and current event cannot be larger than the window length between the first and last event for pattern."); ``` ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java: ## @@ -348,10 +370,28 @@ public Pattern optional() { * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ public Pattern oneOrMore() { +return oneOrMore(null); +} + +/** + * Specifies that this pattern can occur {@code one or more} times and time interval corresponds + * to the maximum time gap between previous and current event for each times. This means at + * least one and at most infinite number of events can be matched to this pattern. + * + * If this quantifier is enabled for a pattern {@code A.oneOrMore().followedBy(B)} and a + * sequence of events {@code A1 A2 B} appears, this will generate patterns: {@code A1 B} and + * {@code A1 A2 B}. See also {@link #allowCombinations()}. + * + * @param windowTimes mapping between times and time of the matching window. + * @return The same pattern with a {@link Quantifier#looping(ConsumingStrategy)} quantifier + * applied. + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. + */ +public Pattern oneOrMore(Map windowTimes) { Review Comment: I guess *oneOrMore(Time windowTime)* is enough. ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java: ## @@ -394,14 +447,28 @@ public Pattern times(int times) { * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ public Pattern times(int from, int to) { +return times(from, to, null); +} + +/** + * Specifies that the pattern can occur between from and to times with time interval corresponds + * to the maximum time gap between previous and current event for each times. + * + * @param from number of times matching event must appear at least + * @param to number of times matching event must appear at most + * @param windowTimes mapping between times and time of the matching window. + * @return The same pattern with the number of times range applied + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. + */ +public Pattern times(int from, int to, Map windowTimes) { Review Comment: ditto ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java: ## @@ -174,6 +182,7 @@ void compileFactory() { if (lastPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW +&& !windowTimes.containsKey(lastPattern.getName()) Review Comment: && (!windowTimes.containsKey(lastPattern.getName()) || windowTimes.get(lastPattern.getName()) <= 0) ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java: ## @@ -613,6 +643,7 @@ private Collection computeNextStates( int ignoreBranchesToVisit = outgoingEdges.getTotalIgnoreBranches(); int totalTakeToSkip = Math.max(0, outgoingEdges.getTotalTakeBranches() - 1); +final long stateTimestamp = event.getTimestamp(); Review Comment: Move it into `case TAKE`? ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java: ## @@ -41,6 +41,9 @@ public class ComputationState { // Timestamp of the first element in the pattern private final long startTimestamp; +// Timestamp of the previous element in the state +private final long stateTimestamp; Review Comment: ```suggestion private final long previousTimestamp; ``` ## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java: ## @@ -91,6 +92,13 @@ */ private final Map> states;