[GitHub] flink pull request #6059: [Flink-9418] Migrate SharedBuffer to use MapState

2018-06-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6059


---


[GitHub] flink pull request #6059: [Flink-9418] Migrate SharedBuffer to use MapState

2018-06-11 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6059#discussion_r194482707
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -296,42 +292,31 @@ public void resetNFAChanged() {
if (shouldDiscardPath) {
// a stop state was reached in this branch. 
release branch which results in removing previous event from
// the buffer
-   for (final ComputationState state : 
statesToRetain) {
-   eventSharedBuffer.release(
-   
NFAStateNameHandler.getOriginalNameFromInternal(
-   
state.getPreviousState().getName()),
-   state.getEvent(),
-   state.getTimestamp(),
-   state.getCounter());
+   for (final ComputationState state : 
statesToRetain) {
+   
sharedBuffer.releaseNode(state.getPreviousBufferEntry());
}
} else {
computationStates.addAll(statesToRetain);
}
 
}
 
-   discardComputationStatesAccordingToStrategy(computationStates, 
result, afterMatchSkipStrategy);
-
-   // prune shared buffer based on window length
-   if (windowTime > 0L) {
-   long pruningTimestamp = timestamp - windowTime;
-
-   if (pruningTimestamp < timestamp) {
-   // the check is to guard against underflows
+   discardComputationStatesAccordingToStrategy(
+   sharedBuffer, computationStates, result, 
afterMatchSkipStrategy);
 
-   // remove all elements which are expired
-   // with respect to the window length
-   if (eventSharedBuffer.prune(pruningTimestamp)) {
-   nfaChanged = true;
-   }
-   }
+   if (event.getEvent() == null) {
--- End diff --

Agree, will add as a separate commit.


---


[GitHub] flink pull request #6059: [Flink-9418] Migrate SharedBuffer to use MapState

2018-06-11 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6059#discussion_r194473454
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -296,42 +292,31 @@ public void resetNFAChanged() {
if (shouldDiscardPath) {
// a stop state was reached in this branch. 
release branch which results in removing previous event from
// the buffer
-   for (final ComputationState state : 
statesToRetain) {
-   eventSharedBuffer.release(
-   
NFAStateNameHandler.getOriginalNameFromInternal(
-   
state.getPreviousState().getName()),
-   state.getEvent(),
-   state.getTimestamp(),
-   state.getCounter());
+   for (final ComputationState state : 
statesToRetain) {
+   
sharedBuffer.releaseNode(state.getPreviousBufferEntry());
}
} else {
computationStates.addAll(statesToRetain);
}
 
}
 
-   discardComputationStatesAccordingToStrategy(computationStates, 
result, afterMatchSkipStrategy);
-
-   // prune shared buffer based on window length
-   if (windowTime > 0L) {
-   long pruningTimestamp = timestamp - windowTime;
-
-   if (pruningTimestamp < timestamp) {
-   // the check is to guard against underflows
+   discardComputationStatesAccordingToStrategy(
+   sharedBuffer, computationStates, result, 
afterMatchSkipStrategy);
 
-   // remove all elements which are expired
-   // with respect to the window length
-   if (eventSharedBuffer.prune(pruningTimestamp)) {
-   nfaChanged = true;
-   }
-   }
+   if (event.getEvent() == null) {
--- End diff --

actually I would recommend having a method: `nfa.advanceTime()` or 
`nfa.clearUpTo(timestamp)` and we call it from the 
`AbstractKeyedCEPPatternOperator.advanceTime()` after calling `processEvent()`. 
This way it is clearer what it does and when it is called. What do you think?


---


[GitHub] flink pull request #6059: [Flink-9418] Migrate SharedBuffer to use MapState

2018-06-11 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6059#discussion_r194470370
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -296,42 +292,31 @@ public void resetNFAChanged() {
if (shouldDiscardPath) {
// a stop state was reached in this branch. 
release branch which results in removing previous event from
// the buffer
-   for (final ComputationState state : 
statesToRetain) {
-   eventSharedBuffer.release(
-   
NFAStateNameHandler.getOriginalNameFromInternal(
-   
state.getPreviousState().getName()),
-   state.getEvent(),
-   state.getTimestamp(),
-   state.getCounter());
+   for (final ComputationState state : 
statesToRetain) {
+   
sharedBuffer.releaseNode(state.getPreviousBufferEntry());
}
} else {
computationStates.addAll(statesToRetain);
}
 
}
 
-   discardComputationStatesAccordingToStrategy(computationStates, 
result, afterMatchSkipStrategy);
-
-   // prune shared buffer based on window length
-   if (windowTime > 0L) {
-   long pruningTimestamp = timestamp - windowTime;
-
-   if (pruningTimestamp < timestamp) {
-   // the check is to guard against underflows
+   discardComputationStatesAccordingToStrategy(
+   sharedBuffer, computationStates, result, 
afterMatchSkipStrategy);
 
-   // remove all elements which are expired
-   // with respect to the window length
-   if (eventSharedBuffer.prune(pruningTimestamp)) {
-   nfaChanged = true;
-   }
-   }
+   if (event.getEvent() == null) {
--- End diff --

I see. Then at least we should have a comment there explaining it. 


---


[GitHub] flink pull request #6059: [Flink-9418] Migrate SharedBuffer to use MapState

2018-06-11 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6059#discussion_r194469640
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -296,42 +292,31 @@ public void resetNFAChanged() {
if (shouldDiscardPath) {
// a stop state was reached in this branch. 
release branch which results in removing previous event from
// the buffer
-   for (final ComputationState state : 
statesToRetain) {
-   eventSharedBuffer.release(
-   
NFAStateNameHandler.getOriginalNameFromInternal(
-   
state.getPreviousState().getName()),
-   state.getEvent(),
-   state.getTimestamp(),
-   state.getCounter());
+   for (final ComputationState state : 
statesToRetain) {
+   
sharedBuffer.releaseNode(state.getPreviousBufferEntry());
}
} else {
computationStates.addAll(statesToRetain);
}
 
}
 
-   discardComputationStatesAccordingToStrategy(computationStates, 
result, afterMatchSkipStrategy);
-
-   // prune shared buffer based on window length
-   if (windowTime > 0L) {
-   long pruningTimestamp = timestamp - windowTime;
-
-   if (pruningTimestamp < timestamp) {
-   // the check is to guard against underflows
+   discardComputationStatesAccordingToStrategy(
+   sharedBuffer, computationStates, result, 
afterMatchSkipStrategy);
 
-   // remove all elements which are expired
-   // with respect to the window length
-   if (eventSharedBuffer.prune(pruningTimestamp)) {
-   nfaChanged = true;
-   }
-   }
+   if (event.getEvent() == null) {
--- End diff --

This is unfortunate way we currently push Watermark into NFA.


---


[GitHub] flink pull request #6059: [Flink-9418] Migrate SharedBuffer to use MapState

2018-06-11 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6059#discussion_r194468194
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -296,42 +292,31 @@ public void resetNFAChanged() {
if (shouldDiscardPath) {
// a stop state was reached in this branch. 
release branch which results in removing previous event from
// the buffer
-   for (final ComputationState state : 
statesToRetain) {
-   eventSharedBuffer.release(
-   
NFAStateNameHandler.getOriginalNameFromInternal(
-   
state.getPreviousState().getName()),
-   state.getEvent(),
-   state.getTimestamp(),
-   state.getCounter());
+   for (final ComputationState state : 
statesToRetain) {
+   
sharedBuffer.releaseNode(state.getPreviousBufferEntry());
}
} else {
computationStates.addAll(statesToRetain);
}
 
}
 
-   discardComputationStatesAccordingToStrategy(computationStates, 
result, afterMatchSkipStrategy);
-
-   // prune shared buffer based on window length
-   if (windowTime > 0L) {
-   long pruningTimestamp = timestamp - windowTime;
-
-   if (pruningTimestamp < timestamp) {
-   // the check is to guard against underflows
+   discardComputationStatesAccordingToStrategy(
+   sharedBuffer, computationStates, result, 
afterMatchSkipStrategy);
 
-   // remove all elements which are expired
-   // with respect to the window length
-   if (eventSharedBuffer.prune(pruningTimestamp)) {
-   nfaChanged = true;
-   }
-   }
+   if (event.getEvent() == null) {
--- End diff --

Why here `==null`? I may be missing sth.


---


[GitHub] flink pull request #6059: [Flink-9418] Migrate SharedBuffer to use MapState

2018-06-04 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6059#discussion_r192651450
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventWrapper.java
 ---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Thin wrapper around user event that adds a lock.
+ *
+ * @param  user event type
+ */
+public class EventWrapper {
--- End diff --

As you are the second person that had some doubts about that I've 
restructured it a bit.


---


[GitHub] flink pull request #6059: [Flink-9418] Migrate SharedBuffer to use MapState

2018-06-02 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6059#discussion_r192558813
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventWrapper.java
 ---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Thin wrapper around user event that adds a lock.
+ *
+ * @param  user event type
+ */
+public class EventWrapper {
--- End diff --

Why the this class use the same name with the inner class in NFA.java? Is 
this intend to?  It is a little confused


---


[GitHub] flink pull request #6059: [Flink-9418] Migrate SharedBuffer to use MapState

2018-05-23 Thread dawidwys
GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/6059

[Flink-9418] Migrate SharedBuffer to use MapState

## What is the purpose of the change

Migrate `SharedBuffer` to `MapState` so to improve memory managment and 
decrease the amount of data that is deserialized.

It is based on #5960. Only the last two commits apply to this change.

## Verifying this change

This change is already covered by existing tests. Added a test that checks 
number of state accesses to keep track if we do not degrade performance in the 
future.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink cep-sharedbuffer-reworked

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6059.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6059


commit 849692eec7576264292a8feaf4f127ad6c47601d
Author: Aljoscha Krettek 
Date:   2018-02-07T12:55:11Z

[FLINK-8725] Separate state from NFA in CEP library

This also changes the serialization of state to not include the static
NFA parts and to also not include any user code.

commit 07f588678cf08422eb12d2ab415d9becc9273144
Author: Dawid Wysakowicz 
Date:   2018-05-04T13:41:27Z

Reverted backward compatibility with <=1.5

commit d3d11d9d4c474c2ff84183d8db8c77045a7771d9
Author: Dawid Wysakowicz 
Date:   2018-05-16T12:08:13Z

[FLINK-9418] Added SharedBuffer v2

commit 515ebba0d90722940e95542a5e9bab8cd9f22939
Author: Dawid Wysakowicz 
Date:   2018-05-23T08:04:37Z

[FLINK-9418] Switched to SharedBuffer v2




---