[GitHub] flink pull request #4145: [FLINK-6938][FLINK-6939] [cep] Not store Iterative...

2017-06-22 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4145#discussion_r123439484
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
 ---
@@ -36,24 +34,14 @@
private final StateTransitionAction action;
private final State sourceState;
private final State targetState;
-   private IterativeCondition newCondition;
-
-   /**
-* @deprecated  This field remains for backwards compatibility.
-* Now the conditions extend the {@link IterativeCondition}.
-*/
-   @Deprecated
-   private FilterFunction condition;
--- End diff --

I think this field needs to stay, though. It was prior to 1.3.x, so it is 
needed for java serialization, which is used for snapshots taken in 1.2.x 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4145: [FLINK-6938][FLINK-6939] [cep] Not store Iterative...

2017-06-22 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4145#discussion_r123439602
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
 ---
@@ -98,32 +82,9 @@ public int hashCode() {
 
@Override
public String toString() {
-   return new StringBuilder()
-   .append("StateTransition(")
-   .append(action).append(", ")
-   .append("from ").append(sourceState.getName())
-   .append("to ").append(targetState.getName())
-   .append(newCondition != null ? ", with 
condition)" : ")")
-   .toString();
-   }
-
-   /**
-* A wrapper to transform a {@link FilterFunction} into a {@link 
SimpleCondition}.
-* This is used only when migrating from an older Flink version.
-*/
-   private static class FilterWrapper extends SimpleCondition {
--- End diff --

Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4145: [FLINK-6938][FLINK-6939] [cep] Not store Iterative...

2017-06-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4145#discussion_r123204588
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -126,6 +129,7 @@ public AbstractKeyedCEPPatternOperator(
this.isProcessingTime = 
Preconditions.checkNotNull(isProcessingTime);
this.keySerializer = Preconditions.checkNotNull(keySerializer);
this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
+   this.conditionRegistry = 
nfaFactory.createNFA().getConditionRegistry();
--- End diff --

Yes, we can.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4145: [FLINK-6938][FLINK-6939] [cep] Not store Iterative...

2017-06-21 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4145#discussion_r123192355
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -126,6 +129,7 @@ public AbstractKeyedCEPPatternOperator(
this.isProcessingTime = 
Preconditions.checkNotNull(isProcessingTime);
this.keySerializer = Preconditions.checkNotNull(keySerializer);
this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
+   this.conditionRegistry = 
nfaFactory.createNFA().getConditionRegistry();
--- End diff --

I am ok that `NFA` right now has pointer to `conditionRegistry`. 

The thing I meant is just in this line, as we call `createNFA()` and ignore 
the created `NFA`. The `nfaFactory` also has a pointer to the same 
`nfaRegistry` so instead of `nfaFactory.createNFA().getConditionRegistry()` we 
could call `nfaFactory.getConditionRegistry()`. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4145: [FLINK-6938][FLINK-6939] [cep] Not store Iterative...

2017-06-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4145#discussion_r123186831
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -1131,6 +1152,7 @@ private void serializeStates(Set> states, 
DataOutputView out) throws IO

nameSerializer.serialize(transition.getTargetState().getName(), out);

actionSerializer.serialize(transition.getAction(), out);
 
+   // backward compatibility

serializeCondition(transition.getCondition(), out);
--- End diff --

Thank you for the suggestion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4145: [FLINK-6938][FLINK-6939] [cep] Not store Iterative...

2017-06-21 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4145#discussion_r123184832
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -1131,6 +1152,7 @@ private void serializeStates(Set> states, 
DataOutputView out) throws IO

nameSerializer.serialize(transition.getTargetState().getName(), out);

actionSerializer.serialize(transition.getAction(), out);
 
+   // backward compatibility

serializeCondition(transition.getCondition(), out);
--- End diff --

Yes, but I think we could increase the VERSION field, and wrap this line in 
`if (readVersion == 1) {...}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4145: [FLINK-6938][FLINK-6939] [cep] Not store Iterative...

2017-06-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4145#discussion_r123140396
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
 ---
@@ -36,6 +36,12 @@
private final StateTransitionAction action;
private final State sourceState;
private final State targetState;
+
+   /**
+* @deprecated  This field remains for backwards compatibility.
+* Now the condition is stored in {@link ConditionRegistry}.
+*/
+   @Deprecated
private IterativeCondition newCondition;
--- End diff --

I think so. I will check it again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4145: [FLINK-6938][FLINK-6939] [cep] Not store Iterative...

2017-06-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4145#discussion_r123139456
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -126,6 +129,7 @@ public AbstractKeyedCEPPatternOperator(
this.isProcessingTime = 
Preconditions.checkNotNull(isProcessingTime);
this.keySerializer = Preconditions.checkNotNull(keySerializer);
this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
+   this.conditionRegistry = 
nfaFactory.createNFA().getConditionRegistry();
--- End diff --

I have tried that, but found it would touch too many codes.  We need to 
change `process` signature to add the `conditionRegistry` parameter, this need 
to change many codes. And we use `NFACompiler.compile` to compile the given 
pattern into a  NFA, but without `conditionRegistry`, that's why I have to put 
`conditionRegistry` in NFA.

This may look a bit wired. But I'm not sure which way is better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4145: [FLINK-6938][FLINK-6939] [cep] Not store Iterative...

2017-06-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4145#discussion_r123138630
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -1131,6 +1152,7 @@ private void serializeStates(Set> states, 
DataOutputView out) throws IO

nameSerializer.serialize(transition.getTargetState().getName(), out);

actionSerializer.serialize(transition.getAction(), out);
 
+   // backward compatibility

serializeCondition(transition.getCondition(), out);
--- End diff --

I'm not sure. Do we need to backward compatible to previous version (like 
1.3.0 which serializes conditions) ?   


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4145: [FLINK-6938][FLINK-6939] [cep] Not store Iterative...

2017-06-20 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4145#discussion_r122978426
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -1164,16 +1186,16 @@ private void serializeStates(Set> states, 
DataOutputView out) throws IO
String trgt = 
nameSerializer.deserialize(in);
StateTransitionAction action = 
actionSerializer.deserialize(in);
 
-   IterativeCondition condition = null;
try {
-   condition = 
deserializeCondition(in);
+   // backward compatibility
+   deserializeCondition(in);
--- End diff --

Just a suggestion/question, why not create an alternative path here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4145: [FLINK-6938][FLINK-6939] [cep] Not store Iterative...

2017-06-20 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4145#discussion_r122982179
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
 ---
@@ -36,6 +36,12 @@
private final StateTransitionAction action;
private final State sourceState;
private final State targetState;
+
+   /**
+* @deprecated  This field remains for backwards compatibility.
+* Now the condition is stored in {@link ConditionRegistry}.
+*/
+   @Deprecated
private IterativeCondition newCondition;
--- End diff --

Fields introduced prior to 1.3.0 needed to be `@deprecated` because java 
serialization was used for serializing NFA. Since 1.3.0 a custom serialization 
is used and this field was introduced for 1.3.0 so I think we can get rid of it 
completely. Am I right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4145: [FLINK-6938][FLINK-6939] [cep] Not store Iterative...

2017-06-20 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4145#discussion_r122985049
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -126,6 +129,7 @@ public AbstractKeyedCEPPatternOperator(
this.isProcessingTime = 
Preconditions.checkNotNull(isProcessingTime);
this.keySerializer = Preconditions.checkNotNull(keySerializer);
this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
+   this.conditionRegistry = 
nfaFactory.createNFA().getConditionRegistry();
--- End diff --

Maybe let's access conditionRegistry through nfaFactory rather than NFA 
itselfs?

Anyway I think we should think of removing one layer in the 
`NFACompiler.java`, but I will create a seperate JIRA for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4145: [FLINK-6938][FLINK-6939] [cep] Not store Iterative...

2017-06-20 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4145#discussion_r122974349
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ConditionRegistry.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The {@link ConditionRegistry} is a registry to manage the mapping from 
a {@link StateTransition}
+ * to the {@link IterativeCondition}. A {@link StateTransition} is unique 
in a {@link NFA}, that's
+ * why we use {@link StateTransition} as the key.
+ */
+public class ConditionRegistry implements Serializable {
+   private static final long serialVersionUID = -4130291010425184830L;
+
+   private final Map> 
registeredConditions;
+
+   private final TransitionInfo reuse = new TransitionInfo();
+
+   /**
+* Creates a new condition registry.
+*/
+   public ConditionRegistry() {
+   this.registeredConditions = new HashMap<>();
--- End diff --

Personally prefer initializing it in the static part of the class. It is 
easier to navigate. Anyway it is already done this way for `TransitionInfo 
reuse`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4145: [FLINK-6938][FLINK-6939] [cep] Not store Iterative...

2017-06-20 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4145#discussion_r122982790
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
 ---
@@ -68,6 +72,15 @@ public StateTransitionAction getAction() {
return sourceState;
}
 
+   public IterativeCondition getCondition(ConditionRegistry 
conditionRegistry) {
+   return conditionRegistry.getCondition(this);
+   }
+
+   /**
+* @deprecated  This field remains for backwards compatibility.
+* Now the condition getter should use {@link 
#getCondition(ConditionRegistry)}.
+*/
+   @Deprecated
public IterativeCondition getCondition() {
--- End diff --

I also think we could get rid of it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4145: [FLINK-6938][FLINK-6939] [cep] Not store Iterative...

2017-06-20 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4145#discussion_r122978388
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -1131,6 +1152,7 @@ private void serializeStates(Set> states, 
DataOutputView out) throws IO

nameSerializer.serialize(transition.getTargetState().getName(), out);

actionSerializer.serialize(transition.getAction(), out);
 
+   // backward compatibility

serializeCondition(transition.getCondition(), out);
--- End diff --

Just a suggestion/question, why not create an alternative path here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4145: [FLINK-6938][FLINK-6939] [cep] Not store Iterative...

2017-06-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4145#discussion_r122929120
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ConditionRegistry.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The {@link ConditionRegistry} is a registry to manage the mapping from 
a {@link StateTransition}
+ * to the {@link IterativeCondition}. A {@link StateTransition} is unique 
in a {@link NFA}, that's
+ * why we use {@link StateTransition} as the key.
+ */
+public class ConditionRegistry implements Serializable {
+   private static final long serialVersionUID = -4130291010425184830L;
+
+   private final Map> 
registeredConditions;
+
+   private final TransitionInfo reuse = new TransitionInfo();
+
+   /**
+* Creates a new condition registry.
+*/
+   public ConditionRegistry() {
+   this.registeredConditions = new HashMap<>();
+   }
+
+   /**
+* Registers an {@link IterativeCondition} for the {@link 
StateTransition}.
+* @param transition the state transition
+* @param condition the condition to register, maybe null
+*/
+   public  void registerCondition(StateTransition transition, 
IterativeCondition condition) {
+   registeredConditions.put(new TransitionInfo(transition), 
condition);
+   }
+
+   /**
+* Gets the corresponding {@link IterativeCondition} with the given 
{@link StateTransition}.
+* @param transition the state transition
+* @return the corresponding {@link IterativeCondition}, maybe null.
+*/
+   public  IterativeCondition getCondition(StateTransition 
transition) {
+   reuse.update(transition);
+   //noinspection unchecked
+   return (IterativeCondition) registeredConditions.get(reuse);
+   }
+
+   /**
+* Opens every non-null conditions.
+*/
+   public void open() throws Exception {
+   for (IterativeCondition condition : 
registeredConditions.values()) {
+   if (condition != null) {
+   FunctionUtils.openFunction(condition, new 
Configuration());
+   }
+   }
+   }
+
+   /**
+* Sets the RuntimeContext for every non-null conditions.
+* @param context the runtime context
+*/
+   public void setRuntimeContext(RuntimeContext context) {
+   for (IterativeCondition condition : 
registeredConditions.values()) {
+   if (condition != null) {
+   
FunctionUtils.setFunctionRuntimeContext(condition, context);
+   }
+   }
+   }
+
+   private static class TransitionInfo implements Serializable {
+   private static final long serialVersionUID = 
-6446693486080356589L;
+
+   private StateTransitionAction action;
+   private String sourceStateName;
+   private String targetStateName;
+
+   TransitionInfo(){
+   }
+
+   TransitionInfo(StateTransition transition) {
+   this.action = transition.getAction();
--- End diff --

Yes, thanks. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org o

[GitHub] flink pull request #4145: [FLINK-6938][FLINK-6939] [cep] Not store Iterative...

2017-06-20 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4145#discussion_r122918782
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ConditionRegistry.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The {@link ConditionRegistry} is a registry to manage the mapping from 
a {@link StateTransition}
+ * to the {@link IterativeCondition}. A {@link StateTransition} is unique 
in a {@link NFA}, that's
+ * why we use {@link StateTransition} as the key.
+ */
+public class ConditionRegistry implements Serializable {
+   private static final long serialVersionUID = -4130291010425184830L;
+
+   private final Map> 
registeredConditions;
+
+   private final TransitionInfo reuse = new TransitionInfo();
+
+   /**
+* Creates a new condition registry.
+*/
+   public ConditionRegistry() {
+   this.registeredConditions = new HashMap<>();
+   }
+
+   /**
+* Registers an {@link IterativeCondition} for the {@link 
StateTransition}.
+* @param transition the state transition
+* @param condition the condition to register, maybe null
+*/
+   public  void registerCondition(StateTransition transition, 
IterativeCondition condition) {
+   registeredConditions.put(new TransitionInfo(transition), 
condition);
+   }
+
+   /**
+* Gets the corresponding {@link IterativeCondition} with the given 
{@link StateTransition}.
+* @param transition the state transition
+* @return the corresponding {@link IterativeCondition}, maybe null.
+*/
+   public  IterativeCondition getCondition(StateTransition 
transition) {
+   reuse.update(transition);
+   //noinspection unchecked
+   return (IterativeCondition) registeredConditions.get(reuse);
+   }
+
+   /**
+* Opens every non-null conditions.
+*/
+   public void open() throws Exception {
+   for (IterativeCondition condition : 
registeredConditions.values()) {
+   if (condition != null) {
+   FunctionUtils.openFunction(condition, new 
Configuration());
+   }
+   }
+   }
+
+   /**
+* Sets the RuntimeContext for every non-null conditions.
+* @param context the runtime context
+*/
+   public void setRuntimeContext(RuntimeContext context) {
+   for (IterativeCondition condition : 
registeredConditions.values()) {
+   if (condition != null) {
+   
FunctionUtils.setFunctionRuntimeContext(condition, context);
+   }
+   }
+   }
+
+   private static class TransitionInfo implements Serializable {
+   private static final long serialVersionUID = 
-6446693486080356589L;
+
+   private StateTransitionAction action;
+   private String sourceStateName;
+   private String targetStateName;
+
+   TransitionInfo(){
+   }
+
+   TransitionInfo(StateTransition transition) {
+   this.action = transition.getAction();
--- End diff --

Could we use `update()` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruc

[GitHub] flink pull request #4145: [FLINK-6938][FLINK-6939] [cep] Not store Iterative...

2017-06-19 Thread wuchong
GitHub user wuchong opened a pull request:

https://github.com/apache/flink/pull/4145

[FLINK-6938][FLINK-6939] [cep] Not store IterativeCondition with NFA state 
and support RichFunction interface

The core idea is that the `StateTransition` is unique in a NFA graph. So we 
store the conditions with a map which mapping from `StateTransition` to 
`IterativeCondition`, so the conditions can not serialized with NFA state. If I 
missed something, please point out.

This PR also includes FLINK-6938: IterativeCondition supports RichFunction 
interface. 


Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wuchong/flink IterativeCondition

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4145.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4145


commit dfd00d0b442b415c43120e31e94a2d5dba5c02c2
Author: Jark Wu 
Date:   2017-06-20T06:02:21Z

[FLINK-6938][FLINK-6939] [cep] Not store IterativeCondition with NFA state 
and support RichFunction interface




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---