[GitHub] [flink] dianfu commented on a diff in pull request #20029: [FLINK-27392][cep] CEP Pattern supports definition of the maximum time gap between events

2022-06-27 Thread GitBox


dianfu commented on code in PR #20029:
URL: https://github.com/apache/flink/pull/20029#discussion_r907935649


##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java:
##
@@ -187,15 +190,17 @@ public enum ConsumingStrategy {
 public static class Times {
 private final int from;
 private final int to;
+private final Time windowTime;

Review Comment:
   ```suggestion
   private final @Nullable Time windowTime;
   ```
   This is missing~



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##
@@ -192,6 +204,22 @@ long getWindowTime() {
 return windowTime.orElse(0L);
 }
 
+Map getWindowTimes() {
+return windowTimes;
+}
+
+/** Check pattern window times between events. */
+private void checkPatternWindowTimes() {

Review Comment:
   Add a test case for this check?



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java:
##
@@ -22,12 +22,20 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.nfa.sharedbuffer.EventId;
 import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
 
 /** Snapshot class for {@link NFAStateSerializer}. */
 public class NFAStateSerializerSnapshot
 extends CompositeTypeSerializerSnapshot {
 
-private static final int CURRENT_VERSION = 1;
+private static final int CURRENT_VERSION = 4;
+
+private static final int FIRST_VERSION_WITH_PREVIOUS_TIMESTAMP = 3;

Review Comment:
   I guess it should be equal to CURRENT_VERSION at present.



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##
@@ -164,6 +172,8 @@ void compileFactory() {
 
 checkPatternSkipStrategy();
 
+checkPatternWindowTimes();

Review Comment:
   I guess this check should be moved after createStartState as windowTimes and 
windowTime are empty here.



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java:
##
@@ -22,12 +22,20 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.nfa.sharedbuffer.EventId;
 import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
 
 /** Snapshot class for {@link NFAStateSerializer}. */
 public class NFAStateSerializerSnapshot
 extends CompositeTypeSerializerSnapshot {
 
-private static final int CURRENT_VERSION = 1;
+private static final int CURRENT_VERSION = 4;

Review Comment:
   Why change it directly from 1 to 4 instead of 2?



##
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java:
##
@@ -37,6 +41,14 @@
 
 /** Tests for {@link Pattern#timesOrMore(int)}. */
 public class TimesOrMoreITCase extends TestLogger {
+
+@Parameterized.Parameter public Time time;
+
+@Parameterized.Parameters(name = "Times Range Time: {0}")

Review Comment:
   Are there test cases covering the window timed out occurred in Times?



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java:
##
@@ -273,7 +282,21 @@ public Collection>> process(
 new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
 
 for (ComputationState computationState : nfaState.getPartialMatches()) 
{
-if (isStateTimedOut(computationState, timestamp)) {
+String currentStateName = computationState.getCurrentStateName();
+boolean isCurrentStateTimeout =
+windowTimes.containsKey(currentStateName)
+&& isStateTimedOut(
+computationState,
+timestamp,
+computationState.getPreviousTimestamp(),
+windowTimes.get(currentStateName));
+boolean isLastStateTimeout =

Review Comment:
   The names `isCurrentStateTimeout ` and `isLastStateTimeout ` are still 
confusing.



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##
@@ -192,6 +204,22 @@ long getWindowTime() {
 return windowTime.orElse(0L);
 }
 
+Map getWindowTimes() {
+return windowTimes;
+}
+
+/** Check pattern window times between events. */
+private void checkPatternWindowTimes() {
+if (windowTimes.values().stream()

Review Comment:
   What about refactoring it as following?
   ```
   windowTime.ifPresent(
   

[GitHub] [flink] dianfu commented on a diff in pull request #20029: [FLINK-27392][cep] CEP Pattern supports definition of the maximum time gap between events

2022-06-27 Thread GitBox


dianfu commented on code in PR #20029:
URL: https://github.com/apache/flink/pull/20029#discussion_r906937708


##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java:
##
@@ -579,6 +662,21 @@ private void checkIfPreviousPatternGreedy() {
 }
 }
 
+private void checkWindowTimeBetweenEvents(Time windowTime, WithinType 
withinType) {

Review Comment:
   What about moving this validation to NFAFactoryCompilter.compileFactory? The 
window time of `FIRST_AND_LAST` may be defined in other Pattern object which is 
only available during compiling the Pattern. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dianfu commented on a diff in pull request #20029: [FLINK-27392][cep] CEP Pattern supports definition of the maximum time gap between events

2022-06-27 Thread GitBox


dianfu commented on code in PR #20029:
URL: https://github.com/apache/flink/pull/20029#discussion_r906937708


##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java:
##
@@ -579,6 +662,21 @@ private void checkIfPreviousPatternGreedy() {
 }
 }
 
+private void checkWindowTimeBetweenEvents(Time windowTime, WithinType 
withinType) {

Review Comment:
   What about moving this validation to NFAFactoryCompilter.compileFactory? It 
window time of `FIRST_AND_LAST` may be defined in other Pattern object which is 
only available during compiling the Pattern. 



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java:
##
@@ -394,14 +447,28 @@ public Pattern times(int times) {
  * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
  */
 public Pattern times(int from, int to) {
+return times(from, to, null);
+}
+
+/**
+ * Specifies that the pattern can occur between from and to times with 
time interval corresponds
+ * to the maximum time gap between previous and current event for each 
times.
+ *
+ * @param from number of times matching event must appear at least
+ * @param to number of times matching event must appear at most
+ * @param windowTime time of the matching window between times
+ * @return The same pattern with the number of times range applied
+ * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
+ */
+public Pattern times(int from, int to, Time windowTime) {

Review Comment:
   ```suggestion
   public Pattern times(int from, int to, @Nullable Time windowTime) {
   ```



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java:
##
@@ -206,12 +209,16 @@ public int getTo() {
 return to;
 }
 
-public static Times of(int from, int to) {
-return new Times(from, to);
+public Time getWindowTime() {
+return windowTime;
 }
 
-public static Times of(int times) {
-return new Times(times, times);
+public static Times of(int from, int to, Time windowTime) {

Review Comment:
   ```suggestion
   public static Times of(int from, int to, @Nullable Time windowTime) {
   ```



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##
@@ -137,6 +144,8 @@ public static boolean canProduceEmptyMatches(final 
Pattern pattern) {
 private final NFAStateNameHandler stateNameHandler = new 
NFAStateNameHandler();
 private final Map> stopStates = new HashMap<>();
 private final List> states = new ArrayList<>();
+private final Map ignoreTimes = new HashMap<>();

Review Comment:
   ignoreTimes is not used



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/MigrationUtils.java:
##
@@ -112,6 +112,7 @@ static  Queue 
deserializeComputationStates(
 long timestamp = timestampSerializer.deserialize(source);
 DeweyNumber version = versionSerializer.deserialize(source);
 long startTimestamp = timestampSerializer.deserialize(source);
+long previousTimestamp = timestampSerializer.deserialize(source);

Review Comment:
   ```suggestion
   long previousTimestamp = -1;
   ```
   
   This is to allow migrating state from checkpoint/savepoint generated prior 
to Flink 1.5 and so I guess we could not just deserialize from it.



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java:
##
@@ -41,6 +41,9 @@ public class ComputationState {
 // Timestamp of the first element in the pattern
 private final long startTimestamp;
 
+// Timestamp of the previous element in the state

Review Comment:
   ```suggestion
   // Timestamp of the previous element of the pattern
   ```



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java:
##
@@ -414,10 +481,26 @@ public Pattern times(int from, int to) {
  * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
  */
 public Pattern timesOrMore(int times) {
+return timesOrMore(times, null);
+}
+
+/**
+ * Specifies that this pattern can occur the specified times at least with 
interval corresponds
+ * to the maximum time gap between previous and current event for each 
times. This means at
+ * least the specified times and at most infinite number of events can be 
matched to this
+ * pattern.
+ *
+ * @param times number of times at least matching event must appear
+ * @param windowTime time of the matching window between times
+ * @return The same pattern with a {@link 
Quantifier#looping(ConsumingStrategy)} quantifier
+ * applie

[GitHub] [flink] dianfu commented on a diff in pull request #20029: [FLINK-27392][cep] CEP Pattern supports definition of the maximum time gap between events

2022-06-22 Thread GitBox


dianfu commented on code in PR #20029:
URL: https://github.com/apache/flink/pull/20029#discussion_r904446329


##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java:
##
@@ -579,6 +662,16 @@ private void checkIfPreviousPatternGreedy() {
 }
 }
 
+private void checkWindowTimeBetweenEvents(Time windowTime, WithinType 
withinType) {
+if (WithinType.PREVIOUS_AND_CURRENT.equals(withinType)
+&& windowTimes.containsKey(WithinType.FIRST_AND_LAST)
+&& windowTime.toMilliseconds()
+> 
windowTimes.get(WithinType.FIRST_AND_LAST).toMilliseconds()) {
+throw new MalformedPatternException(
+"Window length between the previous and current event 
cannot be larger than which between the first and last event for pattern.");

Review Comment:
   ```suggestion
   "Window length between the previous and current event 
cannot be larger than the window length between the first and last event for 
pattern.");
   ```



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java:
##
@@ -348,10 +370,28 @@ public Pattern optional() {
  * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
  */
 public Pattern oneOrMore() {
+return oneOrMore(null);
+}
+
+/**
+ * Specifies that this pattern can occur {@code one or more} times and 
time interval corresponds
+ * to the maximum time gap between previous and current event for each 
times. This means at
+ * least one and at most infinite number of events can be matched to this 
pattern.
+ *
+ * If this quantifier is enabled for a pattern {@code 
A.oneOrMore().followedBy(B)} and a
+ * sequence of events {@code A1 A2 B} appears, this will generate 
patterns: {@code A1 B} and
+ * {@code A1 A2 B}. See also {@link #allowCombinations()}.
+ *
+ * @param windowTimes mapping between times and time of the matching 
window.
+ * @return The same pattern with a {@link 
Quantifier#looping(ConsumingStrategy)} quantifier
+ * applied.
+ * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
+ */
+public Pattern oneOrMore(Map windowTimes) {

Review Comment:
   I guess *oneOrMore(Time windowTime)* is enough.



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java:
##
@@ -394,14 +447,28 @@ public Pattern times(int times) {
  * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
  */
 public Pattern times(int from, int to) {
+return times(from, to, null);
+}
+
+/**
+ * Specifies that the pattern can occur between from and to times with 
time interval corresponds
+ * to the maximum time gap between previous and current event for each 
times.
+ *
+ * @param from number of times matching event must appear at least
+ * @param to number of times matching event must appear at most
+ * @param windowTimes mapping between times and time of the matching 
window.
+ * @return The same pattern with the number of times range applied
+ * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
+ */
+public Pattern times(int from, int to, Map 
windowTimes) {

Review Comment:
   ditto



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##
@@ -174,6 +182,7 @@ void compileFactory() {
 
 if (lastPattern.getQuantifier().getConsumingStrategy()
 == Quantifier.ConsumingStrategy.NOT_FOLLOW
+&& !windowTimes.containsKey(lastPattern.getName())

Review Comment:
   && (!windowTimes.containsKey(lastPattern.getName()) || 
windowTimes.get(lastPattern.getName()) <= 0)



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java:
##
@@ -613,6 +643,7 @@ private Collection computeNextStates(
 int ignoreBranchesToVisit = outgoingEdges.getTotalIgnoreBranches();
 int totalTakeToSkip = Math.max(0, outgoingEdges.getTotalTakeBranches() 
- 1);
 
+final long stateTimestamp = event.getTimestamp();

Review Comment:
   Move it into `case TAKE`?



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java:
##
@@ -41,6 +41,9 @@ public class ComputationState {
 // Timestamp of the first element in the pattern
 private final long startTimestamp;
 
+// Timestamp of the previous element in the state
+private final long stateTimestamp;

Review Comment:
   ```suggestion
   private final long previousTimestamp;
   ```



##
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java:
##
@@ -91,6 +92,13 @@
  */
 private final Map> states;