[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Github user Aitozi closed the pull request at: https://github.com/apache/flink/pull/6168 ---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6168#discussion_r197482701 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -609,7 +611,7 @@ public void close() throws Exception { startTimestamp); //check if newly created state is optional (have a PROCEED path to Final state) - final State finalState = findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), computationState); + final State finalState = findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, computationState), nextState, event.getEvent()); --- End diff -- Yes, you are right. Read the code again, the `TAKE` branch only put the new `Node` to sharedBuffer which just point to the previousNodeId, This indeed don't affect the result of the current CS's partial match. I will take your suggestion ---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6168#discussion_r197461704 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -609,7 +611,7 @@ public void close() throws Exception { startTimestamp); //check if newly created state is optional (have a PROCEED path to Final state) - final State finalState = findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), computationState); + final State finalState = findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, computationState), nextState, event.getEvent()); --- End diff -- Yes that is true the shared buffer might change but only "after" the CS. I mean those changes can't affect partial match for current Computation State. Therefore I would suggest to remove the flag completely and just keep the lazy evaluation of `matchedEvents` in `ConditionContext` by comparing it to null: if (matchedEvents == null) { this.matchedEvents = nfa.extractCurrentMatches(sharedBuffer, computationState); } ---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6168#discussion_r195792448 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -609,7 +611,7 @@ public void close() throws Exception { startTimestamp); //check if newly created state is optional (have a PROCEED path to Final state) - final State finalState = findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), computationState); + final State finalState = findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, computationState), nextState, event.getEvent()); --- End diff -- I think over it again, the content of the `sharedBuffer` make difference to the result of the `getEventsForPattern`, so the result should be update with the change of the `sharedBuffer`. But i think we only have to reset the `shouldUpdate` flag to `true` here rather than create a context again, right? @dawidwys ---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6168#discussion_r195745115 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -609,7 +611,7 @@ public void close() throws Exception { startTimestamp); //check if newly created state is optional (have a PROCEED path to Final state) - final State finalState = findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), computationState); + final State finalState = findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, computationState), nextState, event.getEvent()); --- End diff -- I think the sharedbuffer has been changed during the `TAKE` branch, the conditionContext should be different. ---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6168#discussion_r195742430 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -609,7 +611,7 @@ public void close() throws Exception { startTimestamp); //check if newly created state is optional (have a PROCEED path to Final state) - final State finalState = findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), computationState); + final State finalState = findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, computationState), nextState, event.getEvent()); --- End diff -- Why don't you use the one created in the beginning? ---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6168#discussion_r195738359 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -697,6 +697,7 @@ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) { final Stack> states = new Stack<>(); states.push(state); + ConditionContext context = new ConditionContext(this, sharedBuffer, computationState); --- End diff -- agree ---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6168#discussion_r195737755 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -697,6 +697,7 @@ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) { final Stack> states = new Stack<>(); states.push(state); + ConditionContext context = new ConditionContext(this, sharedBuffer, computationState); --- End diff -- I think it needs the `conditionContext` and `computationState` and should replace the `sharedBuffer` with `conditionContext`. ---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6168#discussion_r195719385 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -697,6 +697,7 @@ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) { final Stack> states = new Stack<>(); states.push(state); + ConditionContext context = new ConditionContext(this, sharedBuffer, computationState); --- End diff -- How about creating the context only once at the very beginning of computeNextStates? ---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6168#discussion_r195720052 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -697,6 +697,7 @@ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) { final Stack> states = new Stack<>(); states.push(state); + ConditionContext context = new ConditionContext(this, sharedBuffer, computationState); --- End diff -- Rather than `computationState` just pass `conditionContext` to the `createDecisionGraph` method ---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6168#discussion_r195719763 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -662,13 +662,13 @@ private void addComputationState( ComputationState computationState) { --- End diff -- Replace `ComputationState` with `ConditionContext` ---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6168#discussion_r195719361 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -662,13 +662,13 @@ private void addComputationState( ComputationState computationState) { final Stack> statesToCheck = new Stack<>(); statesToCheck.push(state); - + ConditionContext context = new ConditionContext(this, sharedBuffer, computationState); --- End diff -- How about creating the context only once at the very beginning of computeNextStates? ---