dianfu commented on code in PR #20029:
URL: https://github.com/apache/flink/pull/20029#discussion_r907935649


##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java:
##########
@@ -187,15 +190,17 @@ public enum ConsumingStrategy {
     public static class Times {
         private final int from;
         private final int to;
+        private final Time windowTime;

Review Comment:
   ```suggestion
           private final @Nullable Time windowTime;
   ```
   This is missing~



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##########
@@ -192,6 +204,22 @@ long getWindowTime() {
             return windowTime.orElse(0L);
         }
 
+        Map<String, Long> getWindowTimes() {
+            return windowTimes;
+        }
+
+        /** Check pattern window times between events. */
+        private void checkPatternWindowTimes() {

Review Comment:
   Add a test case for this check?



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java:
##########
@@ -22,12 +22,20 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.nfa.sharedbuffer.EventId;
 import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
 
 /** Snapshot class for {@link NFAStateSerializer}. */
 public class NFAStateSerializerSnapshot
         extends CompositeTypeSerializerSnapshot<NFAState, NFAStateSerializer> {
 
-    private static final int CURRENT_VERSION = 1;
+    private static final int CURRENT_VERSION = 4;
+
+    private static final int FIRST_VERSION_WITH_PREVIOUS_TIMESTAMP = 3;

Review Comment:
   I guess it should be equal to CURRENT_VERSION at present.



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##########
@@ -164,6 +172,8 @@ void compileFactory() {
 
             checkPatternSkipStrategy();
 
+            checkPatternWindowTimes();

Review Comment:
   I guess this check should be moved after createStartState as windowTimes and 
windowTime are empty here.



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java:
##########
@@ -22,12 +22,20 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.nfa.sharedbuffer.EventId;
 import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
 
 /** Snapshot class for {@link NFAStateSerializer}. */
 public class NFAStateSerializerSnapshot
         extends CompositeTypeSerializerSnapshot<NFAState, NFAStateSerializer> {
 
-    private static final int CURRENT_VERSION = 1;
+    private static final int CURRENT_VERSION = 4;

Review Comment:
   Why change it directly from 1 to 4 instead of 2?



##########
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java:
##########
@@ -37,6 +41,14 @@
 
 /** Tests for {@link Pattern#timesOrMore(int)}. */
 public class TimesOrMoreITCase extends TestLogger {
+
+    @Parameterized.Parameter public Time time;
+
+    @Parameterized.Parameters(name = "Times Range Time: {0}")

Review Comment:
   Are there test cases covering the window timed out occurred in Times?



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java:
##########
@@ -273,7 +282,21 @@ public Collection<Map<String, List<T>>> process(
                 new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
 
         for (ComputationState computationState : nfaState.getPartialMatches()) 
{
-            if (isStateTimedOut(computationState, timestamp)) {
+            String currentStateName = computationState.getCurrentStateName();
+            boolean isCurrentStateTimeout =
+                    windowTimes.containsKey(currentStateName)
+                            && isStateTimedOut(
+                                    computationState,
+                                    timestamp,
+                                    computationState.getPreviousTimestamp(),
+                                    windowTimes.get(currentStateName));
+            boolean isLastStateTimeout =

Review Comment:
   The names `isCurrentStateTimeout ` and `isLastStateTimeout ` are still 
confusing.



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##########
@@ -192,6 +204,22 @@ long getWindowTime() {
             return windowTime.orElse(0L);
         }
 
+        Map<String, Long> getWindowTimes() {
+            return windowTimes;
+        }
+
+        /** Check pattern window times between events. */
+        private void checkPatternWindowTimes() {
+            if (windowTimes.values().stream()

Review Comment:
   What about refactoring it as following?
   ```
               windowTime.ifPresent(
                       windowTime -> {
                           if (windowTimes.values().stream().anyMatch(time -> 
time > windowTime)) {
                               throw new MalformedPatternException(
                                       "The window length between the previous 
and current event cannot be larger than the window length between the first and 
last event for a Pattern.");
                           }
                       });
   ```



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/MigrationUtils.java:
##########
@@ -131,7 +131,7 @@ static <T> Queue<ComputationState> 
deserializeComputationStates(
 
             computationStates.add(
                     ComputationState.createState(
-                            state, nodeId, version, startTimestamp, 
startEventId));
+                            state, nodeId, version, startTimestamp, 0L, 
startEventId));

Review Comment:
   Sometimes previousTimestamp is set to 0 and sometimes it set to -1. Could we 
keep it consistent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to