dianfu commented on code in PR #20029: URL: https://github.com/apache/flink/pull/20029#discussion_r907935649
########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java: ########## @@ -187,15 +190,17 @@ public enum ConsumingStrategy { public static class Times { private final int from; private final int to; + private final Time windowTime; Review Comment: ```suggestion private final @Nullable Time windowTime; ``` This is missing~ ########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java: ########## @@ -192,6 +204,22 @@ long getWindowTime() { return windowTime.orElse(0L); } + Map<String, Long> getWindowTimes() { + return windowTimes; + } + + /** Check pattern window times between events. */ + private void checkPatternWindowTimes() { Review Comment: Add a test case for this check? ########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java: ########## @@ -22,12 +22,20 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.cep.nfa.sharedbuffer.EventId; import org.apache.flink.cep.nfa.sharedbuffer.NodeId; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; /** Snapshot class for {@link NFAStateSerializer}. */ public class NFAStateSerializerSnapshot extends CompositeTypeSerializerSnapshot<NFAState, NFAStateSerializer> { - private static final int CURRENT_VERSION = 1; + private static final int CURRENT_VERSION = 4; + + private static final int FIRST_VERSION_WITH_PREVIOUS_TIMESTAMP = 3; Review Comment: I guess it should be equal to CURRENT_VERSION at present. ########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java: ########## @@ -164,6 +172,8 @@ void compileFactory() { checkPatternSkipStrategy(); + checkPatternWindowTimes(); Review Comment: I guess this check should be moved after createStartState as windowTimes and windowTime are empty here. ########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java: ########## @@ -22,12 +22,20 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.cep.nfa.sharedbuffer.EventId; import org.apache.flink.cep.nfa.sharedbuffer.NodeId; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; /** Snapshot class for {@link NFAStateSerializer}. */ public class NFAStateSerializerSnapshot extends CompositeTypeSerializerSnapshot<NFAState, NFAStateSerializer> { - private static final int CURRENT_VERSION = 1; + private static final int CURRENT_VERSION = 4; Review Comment: Why change it directly from 1 to 4 instead of 2? ########## flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java: ########## @@ -37,6 +41,14 @@ /** Tests for {@link Pattern#timesOrMore(int)}. */ public class TimesOrMoreITCase extends TestLogger { + + @Parameterized.Parameter public Time time; + + @Parameterized.Parameters(name = "Times Range Time: {0}") Review Comment: Are there test cases covering the window timed out occurred in Times? ########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java: ########## @@ -273,7 +282,21 @@ public Collection<Map<String, List<T>>> process( new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR); for (ComputationState computationState : nfaState.getPartialMatches()) { - if (isStateTimedOut(computationState, timestamp)) { + String currentStateName = computationState.getCurrentStateName(); + boolean isCurrentStateTimeout = + windowTimes.containsKey(currentStateName) + && isStateTimedOut( + computationState, + timestamp, + computationState.getPreviousTimestamp(), + windowTimes.get(currentStateName)); + boolean isLastStateTimeout = Review Comment: The names `isCurrentStateTimeout ` and `isLastStateTimeout ` are still confusing. ########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java: ########## @@ -192,6 +204,22 @@ long getWindowTime() { return windowTime.orElse(0L); } + Map<String, Long> getWindowTimes() { + return windowTimes; + } + + /** Check pattern window times between events. */ + private void checkPatternWindowTimes() { + if (windowTimes.values().stream() Review Comment: What about refactoring it as following? ``` windowTime.ifPresent( windowTime -> { if (windowTimes.values().stream().anyMatch(time -> time > windowTime)) { throw new MalformedPatternException( "The window length between the previous and current event cannot be larger than the window length between the first and last event for a Pattern."); } }); ``` ########## flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/MigrationUtils.java: ########## @@ -131,7 +131,7 @@ static <T> Queue<ComputationState> deserializeComputationStates( computationStates.add( ComputationState.createState( - state, nodeId, version, startTimestamp, startEventId)); + state, nodeId, version, startTimestamp, 0L, startEventId)); Review Comment: Sometimes previousTimestamp is set to 0 and sometimes it set to -1. Could we keep it consistent? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org