dianfu commented on code in PR #20029:
URL: https://github.com/apache/flink/pull/20029#discussion_r906937708


##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java:
##########
@@ -579,6 +662,21 @@ private void checkIfPreviousPatternGreedy() {
         }
     }
 
+    private void checkWindowTimeBetweenEvents(Time windowTime, WithinType 
withinType) {

Review Comment:
   What about moving this validation to NFAFactoryCompilter.compileFactory? It 
window time of `FIRST_AND_LAST` may be defined in other Pattern object which is 
only available during compiling the Pattern. 



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java:
##########
@@ -394,14 +447,28 @@ public Pattern<T, F> times(int times) {
      * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
      */
     public Pattern<T, F> times(int from, int to) {
+        return times(from, to, null);
+    }
+
+    /**
+     * Specifies that the pattern can occur between from and to times with 
time interval corresponds
+     * to the maximum time gap between previous and current event for each 
times.
+     *
+     * @param from number of times matching event must appear at least
+     * @param to number of times matching event must appear at most
+     * @param windowTime time of the matching window between times
+     * @return The same pattern with the number of times range applied
+     * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
+     */
+    public Pattern<T, F> times(int from, int to, Time windowTime) {

Review Comment:
   ```suggestion
       public Pattern<T, F> times(int from, int to, @Nullable Time windowTime) {
   ```



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java:
##########
@@ -206,12 +209,16 @@ public int getTo() {
             return to;
         }
 
-        public static Times of(int from, int to) {
-            return new Times(from, to);
+        public Time getWindowTime() {
+            return windowTime;
         }
 
-        public static Times of(int times) {
-            return new Times(times, times);
+        public static Times of(int from, int to, Time windowTime) {

Review Comment:
   ```suggestion
           public static Times of(int from, int to, @Nullable Time windowTime) {
   ```



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##########
@@ -137,6 +144,8 @@ public static boolean canProduceEmptyMatches(final 
Pattern<?, ?> pattern) {
         private final NFAStateNameHandler stateNameHandler = new 
NFAStateNameHandler();
         private final Map<String, State<T>> stopStates = new HashMap<>();
         private final List<State<T>> states = new ArrayList<>();
+        private final Map<String, Integer> ignoreTimes = new HashMap<>();

Review Comment:
   ignoreTimes is not used



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/MigrationUtils.java:
##########
@@ -112,6 +112,7 @@ static <T> Queue<ComputationState> 
deserializeComputationStates(
             long timestamp = timestampSerializer.deserialize(source);
             DeweyNumber version = versionSerializer.deserialize(source);
             long startTimestamp = timestampSerializer.deserialize(source);
+            long previousTimestamp = timestampSerializer.deserialize(source);

Review Comment:
   ```suggestion
               long previousTimestamp = -1;
   ```
   
   This is to allow migrating state from checkpoint/savepoint generated prior 
to Flink 1.5 and so I guess we could not just deserialize from it.



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java:
##########
@@ -41,6 +41,9 @@ public class ComputationState {
     // Timestamp of the first element in the pattern
     private final long startTimestamp;
 
+    // Timestamp of the previous element in the state

Review Comment:
   ```suggestion
       // Timestamp of the previous element of the pattern
   ```



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java:
##########
@@ -414,10 +481,26 @@ public Pattern<T, F> times(int from, int to) {
      * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
      */
     public Pattern<T, F> timesOrMore(int times) {
+        return timesOrMore(times, null);
+    }
+
+    /**
+     * Specifies that this pattern can occur the specified times at least with 
interval corresponds
+     * to the maximum time gap between previous and current event for each 
times. This means at
+     * least the specified times and at most infinite number of events can be 
matched to this
+     * pattern.
+     *
+     * @param times number of times at least matching event must appear
+     * @param windowTime time of the matching window between times
+     * @return The same pattern with a {@link 
Quantifier#looping(ConsumingStrategy)} quantifier
+     *     applied.
+     * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
+     */
+    public Pattern<T, F> timesOrMore(int times, Time windowTime) {

Review Comment:
   ```suggestion
       public Pattern<T, F> timesOrMore(int times, @Nullable Time windowTime) {
   ```



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java:
##########
@@ -223,12 +230,12 @@ public boolean equals(Object o) {
                 return false;
             }
             Times times = (Times) o;
-            return from == times.from && to == times.to;
+            return from == times.from && to == times.to && windowTime == 
times.windowTime;

Review Comment:
   ```suggestion
               return from == times.from && to == times.to && 
windowTime.toMilliseconds() == times.windowTime.toMilliseconds();
   ```



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java:
##########
@@ -377,11 +417,24 @@ public Pattern<T, F> greedy() {
      * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
      */
     public Pattern<T, F> times(int times) {
+        return times(times, null);
+    }
+
+    /**
+     * Specifies exact number of times that this pattern should be matched and 
time interval
+     * corresponds to the maximum time gap between previous and current event 
for each times.
+     *
+     * @param times number of times matching event must appear
+     * @param windowTime time of the matching window between times
+     * @return The same pattern with number of times applied
+     * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
+     */
+    public Pattern<T, F> times(int times, Time windowTime) {

Review Comment:
   ```suggestion
       public Pattern<T, F> times(int times, @Nullable Time windowTime) {
   ```



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java:
##########
@@ -67,6 +72,15 @@ protected NFAStateSerializer 
createOuterSerializerWithNestedSerializers(
         @SuppressWarnings("unchecked")
         TypeSerializer<EventId> eventIdSerializer = (TypeSerializer<EventId>) 
nestedSerializers[2];
 
-        return new NFAStateSerializer(versionSerializer, nodeIdSerializer, 
eventIdSerializer);
+        return new NFAStateSerializer(
+                versionSerializer, nodeIdSerializer, eventIdSerializer, 
supportsPreviousTimestamp);
+    }
+
+    @Override
+    protected void readOuterSnapshot(
+            int readOuterSnapshotVersion, DataInputView in, ClassLoader 
userCodeClassLoader)
+            throws IOException {
+        super.readOuterSnapshot(readOuterSnapshotVersion, in, 
userCodeClassLoader);
+        supportsPreviousTimestamp = readOuterSnapshotVersion == 
CURRENT_VERSION;

Review Comment:
   Add static variable `FIRST_VERSION_WITH_PREVIOUS_TIMESTAMP` and change this 
to the following:
   ```
           @Override
           protected void readOuterSnapshot(
                   int readOuterSnapshotVersion, DataInputView in, ClassLoader 
userCodeClassLoader)
                   throws IOException {
               if (readOuterSnapshotVersion < 
FIRST_VERSION_WITH_PREVIOUS_TIMESTAMP) {
                   supportsPreviousTimestamp = false;
               } else if (readOuterSnapshotVersion == 
FIRST_VERSION_WITH_PREVIOUS_TIMESTAMP) {
                   supportsPreviousTimestamp = true;
               } else {
                   supportsPreviousTimestamp = in.readBoolean();
               }
           }
   ```



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java:
##########
@@ -187,15 +188,17 @@ public enum ConsumingStrategy {
     public static class Times {
         private final int from;
         private final int to;
+        private final Time windowTime;

Review Comment:
   ```suggestion
           private final @Nullable Time windowTime;
   ```



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java:
##########
@@ -273,7 +282,21 @@ public Collection<Map<String, List<T>>> process(
                 new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
 
         for (ComputationState computationState : nfaState.getPartialMatches()) 
{
-            if (isStateTimedOut(computationState, timestamp)) {
+            String currentStateName = computationState.getCurrentStateName();
+            boolean withinStateTimeout =

Review Comment:
   The name *withinStateTimeout* and *isStateTimeout* are not quite clear. What 
about finding some other name, e.g.  isStateTimeoutBetweenEvents vs 
isStateTimeoutWholePattern



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java:
##########
@@ -348,10 +370,28 @@ public Pattern<T, F> optional() {
      * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
      */
     public Pattern<T, F> oneOrMore() {
+        return oneOrMore(null);
+    }
+
+    /**
+     * Specifies that this pattern can occur {@code one or more} times and 
time interval corresponds
+     * to the maximum time gap between previous and current event for each 
times. This means at
+     * least one and at most infinite number of events can be matched to this 
pattern.
+     *
+     * <p>If this quantifier is enabled for a pattern {@code 
A.oneOrMore().followedBy(B)} and a
+     * sequence of events {@code A1 A2 B} appears, this will generate 
patterns: {@code A1 B} and
+     * {@code A1 A2 B}. See also {@link #allowCombinations()}.
+     *
+     * @param windowTime time of the matching window between times
+     * @return The same pattern with a {@link 
Quantifier#looping(ConsumingStrategy)} quantifier
+     *     applied.
+     * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
+     */
+    public Pattern<T, F> oneOrMore(Time windowTime) {

Review Comment:
   ```suggestion
       public Pattern<T, F> oneOrMore(@Nullable Time windowTime) {
   ```



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##########
@@ -390,10 +405,28 @@ private State<T> convertPattern(final State<T> sinkState) 
{
             return lastSink;
         }
 
+        private State<T> createState(State.StateType stateType, boolean 
notIgnore) {
+            State<T> state = createState(currentPattern.getName(), stateType);
+            if (notIgnore) {

Review Comment:
   What does the variable name *notIgnore* mean? It's not quite clear for me.



##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java:
##########
@@ -67,6 +72,15 @@ protected NFAStateSerializer 
createOuterSerializerWithNestedSerializers(
         @SuppressWarnings("unchecked")
         TypeSerializer<EventId> eventIdSerializer = (TypeSerializer<EventId>) 
nestedSerializers[2];
 
-        return new NFAStateSerializer(versionSerializer, nodeIdSerializer, 
eventIdSerializer);
+        return new NFAStateSerializer(
+                versionSerializer, nodeIdSerializer, eventIdSerializer, 
supportsPreviousTimestamp);
+    }

Review Comment:
   I guess we need implement the following method to allow restoring state 
generated from previous version:
   ```
           @Override
           protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(
                   NFAStateSerializer newSerializer) {
               if (supportsPreviousTimestamp != 
newSerializer.supportsPreviousTimestamp) {
                   return OuterSchemaCompatibility.COMPATIBLE_AFTER_MIGRATION;
               }
               return OuterSchemaCompatibility.COMPATIBLE_AS_IS;
           }
   ```
   
   Besides, we should also implement the following method:
   ```
           @Override
           protected void writeOuterSnapshot(DataOutputView out) throws 
IOException {
               out.writeBoolean(supportsPreviousTimestamp);
           }
   ```



##########
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java:
##########
@@ -37,6 +41,14 @@
 
 /** Tests for {@link Pattern#timesOrMore(int)}. */
 public class TimesOrMoreITCase extends TestLogger {
+
+    @Parameterized.Parameter public Time time;
+
+    @Parameterized.Parameters(name = "Times Range Time: {0}")

Review Comment:
   The results for the following two cases are same? Is this expected?



##########
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java:
##########
@@ -1560,16 +1570,16 @@ public boolean filter(Event value) throws Exception {
     }
 
     @Test
-    public void testNotFollowByBeforeTimesWithIn() throws Exception {
+    public void testNotFollowByBeforeTimesWithin() throws Exception {
         List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
         Event a1 = new Event(40, "a", 1.0);
         Event b1 = new Event(41, "b", 2.0);
         Event a2 = new Event(42, "a", 3.0);
         Event c1 = new Event(43, "c", 4.0);
         Event c2 = new Event(44, "c", 5.0);
-        Event a3 = new Event(46, "a", 7.0);

Review Comment:
   Why changed this test case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to