[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

2018-07-08 Thread Aitozi
Github user Aitozi closed the pull request at:

https://github.com/apache/flink/pull/6168


---


[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

2018-06-22 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6168#discussion_r197482701
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -609,7 +611,7 @@ public void close() throws Exception {
startTimestamp);
 
//check if newly created state is 
optional (have a PROCEED path to Final state)
-   final State finalState = 
findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), 
computationState);
+   final State finalState = 
findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, 
computationState), nextState, event.getEvent());
--- End diff --

Yes, you are right. Read the code again,  the `TAKE` branch only put the 
new `Node` to sharedBuffer which just point to the previousNodeId, This indeed 
don't affect the result of the current CS's partial match. I will take your 
suggestion


---


[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

2018-06-22 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6168#discussion_r197461704
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -609,7 +611,7 @@ public void close() throws Exception {
startTimestamp);
 
//check if newly created state is 
optional (have a PROCEED path to Final state)
-   final State finalState = 
findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), 
computationState);
+   final State finalState = 
findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, 
computationState), nextState, event.getEvent());
--- End diff --

Yes that is true the shared buffer might change but only "after" the CS. I 
mean those changes can't affect partial match for current Computation State. 
Therefore I would suggest to remove the flag completely and just keep the lazy 
evaluation of `matchedEvents` in `ConditionContext` by comparing it to null:

if (matchedEvents == null) {
this.matchedEvents = nfa.extractCurrentMatches(sharedBuffer, 
computationState);
}



---


[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

2018-06-15 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6168#discussion_r195792448
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -609,7 +611,7 @@ public void close() throws Exception {
startTimestamp);
 
//check if newly created state is 
optional (have a PROCEED path to Final state)
-   final State finalState = 
findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), 
computationState);
+   final State finalState = 
findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, 
computationState), nextState, event.getEvent());
--- End diff --

I think over it again, the content of the `sharedBuffer` make difference  
to  the result of the `getEventsForPattern`, so the result should be update 
with the change of the `sharedBuffer`. But i think we only have to reset the 
`shouldUpdate` flag to `true` here rather than create a  context again, right? 
@dawidwys 


---


[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

2018-06-15 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6168#discussion_r195745115
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -609,7 +611,7 @@ public void close() throws Exception {
startTimestamp);
 
//check if newly created state is 
optional (have a PROCEED path to Final state)
-   final State finalState = 
findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), 
computationState);
+   final State finalState = 
findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, 
computationState), nextState, event.getEvent());
--- End diff --

I think the sharedbuffer has been changed during the `TAKE` branch, the 
conditionContext should be different.


---


[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

2018-06-15 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6168#discussion_r195742430
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -609,7 +611,7 @@ public void close() throws Exception {
startTimestamp);
 
//check if newly created state is 
optional (have a PROCEED path to Final state)
-   final State finalState = 
findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), 
computationState);
+   final State finalState = 
findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, 
computationState), nextState, event.getEvent());
--- End diff --

Why don't you use the one created in the beginning?


---


[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

2018-06-15 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6168#discussion_r195738359
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -697,6 +697,7 @@ private int calculateIncreasingSelfState(int 
ignoreBranches, int takeBranches) {
 
final Stack> states = new Stack<>();
states.push(state);
+   ConditionContext context = new ConditionContext(this, 
sharedBuffer, computationState);
--- End diff --

agree


---


[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

2018-06-15 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6168#discussion_r195737755
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -697,6 +697,7 @@ private int calculateIncreasingSelfState(int 
ignoreBranches, int takeBranches) {
 
final Stack> states = new Stack<>();
states.push(state);
+   ConditionContext context = new ConditionContext(this, 
sharedBuffer, computationState);
--- End diff --

I think it needs the `conditionContext` and `computationState` and should 
replace the `sharedBuffer` with `conditionContext`.


---


[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

2018-06-15 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6168#discussion_r195719385
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -697,6 +697,7 @@ private int calculateIncreasingSelfState(int 
ignoreBranches, int takeBranches) {
 
final Stack> states = new Stack<>();
states.push(state);
+   ConditionContext context = new ConditionContext(this, 
sharedBuffer, computationState);
--- End diff --

How about creating the context only once at the very beginning of 
computeNextStates?


---


[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

2018-06-15 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6168#discussion_r195720052
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -697,6 +697,7 @@ private int calculateIncreasingSelfState(int 
ignoreBranches, int takeBranches) {
 
final Stack> states = new Stack<>();
states.push(state);
+   ConditionContext context = new ConditionContext(this, 
sharedBuffer, computationState);
--- End diff --

Rather than `computationState` just pass `conditionContext` to the 
`createDecisionGraph` method


---


[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

2018-06-15 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6168#discussion_r195719763
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -662,13 +662,13 @@ private void addComputationState(
ComputationState computationState) {
--- End diff --

Replace `ComputationState` with `ConditionContext`


---


[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

2018-06-15 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6168#discussion_r195719361
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -662,13 +662,13 @@ private void addComputationState(
ComputationState computationState) {
final Stack> statesToCheck = new Stack<>();
statesToCheck.push(state);
-
+   ConditionContext context = new ConditionContext(this, 
sharedBuffer, computationState);
--- End diff --

How about creating the context only once at the very beginning of 
computeNextStates?


---