dawidwys closed pull request #4172: [FLINK-6983] [cep] Do not serialize States 
with NFA
URL: https://github.com/apache/flink/pull/4172
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index a6c5bdeba2f..a1b251cd65e 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -116,27 +116,27 @@
        @Deprecated
        private final NonDuplicatingTypeSerializer<T> 
nonDuplicatingTypeSerializer;
 
-       //////////////////                      End of Backwards Compatibility 
Fields                   //////////////////
-
        /**
-        * A set of all the valid NFA states, as returned by the
-        * {@link NFACompiler NFACompiler}.
-        * These are directly derived from the user-specified pattern.
+        * @deprecated Used only for backwards compatibility.
         */
+       @Deprecated
        private Set<State<T>> states;
 
        /**
-        * The length of a windowed pattern, as specified using the
-        * {@link org.apache.flink.cep.pattern.Pattern#within(Time)}  
Pattern.within(Time)}
-        * method.
+        * @deprecated Used only for backwards compatibility.
         */
-       private final long windowTime;
+       @Deprecated
+       private long windowTime;
 
        /**
-        * A flag indicating if we want timed-out patterns (in case of windowed 
patterns)
-        * to be emitted ({@code true}), or silently discarded ({@code false}).
+        * @deprecated Used only for backwards compatibility.
         */
-       private final boolean handleTimeout;
+       @Deprecated
+       private boolean handleTimeout;
+
+       //////////////////                      End of Backwards Compatibility 
Fields                   //////////////////
+
+       private transient MetaStates<T> metaStates;
 
        /**
         * Current set of {@link ComputationState computation states} within 
the state machine.
@@ -159,15 +159,17 @@ public NFA(
 
                this.eventSerializer = eventSerializer;
                this.nonDuplicatingTypeSerializer = new 
NonDuplicatingTypeSerializer<>(eventSerializer);
-               this.windowTime = windowTime;
-               this.handleTimeout = handleTimeout;
+               this.metaStates = new MetaStates<>(windowTime, handleTimeout);
                this.eventSharedBuffer = new 
SharedBuffer<>(nonDuplicatingTypeSerializer);
                this.computationStates = new LinkedList<>();
-               this.states = new HashSet<>();
+       }
+
+       public MetaStates<T> getMetaStates() {
+               return metaStates;
        }
 
        public Set<State<T>> getStates() {
-               return states;
+               return metaStates.states;
        }
 
        public void addStates(final Collection<State<T>> newStates) {
@@ -177,7 +179,7 @@ public void addStates(final Collection<State<T>> newStates) 
{
        }
 
        public void addState(final State<T> state) {
-               states.add(state);
+               metaStates.states.add(state);
 
                if (state.isStart()) {
                        
computationStates.add(ComputationState.createStartState(this, state));
@@ -221,10 +223,10 @@ public boolean isEmpty() {
                        final Collection<ComputationState<T>> 
newComputationStates;
 
                        if (!computationState.isStartState() &&
-                               windowTime > 0L &&
-                               timestamp - 
computationState.getStartTimestamp() >= windowTime) {
+                               metaStates.windowTime > 0L &&
+                               timestamp - 
computationState.getStartTimestamp() >= metaStates.windowTime) {
 
-                               if (handleTimeout) {
+                               if (metaStates.handleTimeout) {
                                        // extract the timed out event pattern
                                        Map<String, List<T>> timedoutPattern = 
extractCurrentMatches(computationState);
                                        
timeoutResult.add(Tuple2.of(timedoutPattern, timestamp));
@@ -294,8 +296,8 @@ public boolean isEmpty() {
                }
 
                // prune shared buffer based on window length
-               if (windowTime > 0L) {
-                       long pruningTimestamp = timestamp - windowTime;
+               if (metaStates.windowTime > 0L) {
+                       long pruningTimestamp = timestamp - 
metaStates.windowTime;
 
                        if (pruningTimestamp < timestamp) {
                                // the check is to guard against underflows
@@ -317,8 +319,8 @@ public boolean equals(Object obj) {
 
                        return 
nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) &&
                                
eventSharedBuffer.equals(other.eventSharedBuffer) &&
-                               states.equals(other.states) &&
-                               windowTime == other.windowTime;
+                               
metaStates.states.equals(other.metaStates.states) &&
+                               metaStates.windowTime == 
other.metaStates.windowTime;
                } else {
                        return false;
                }
@@ -326,7 +328,11 @@ public boolean equals(Object obj) {
 
        @Override
        public int hashCode() {
-               return Objects.hash(nonDuplicatingTypeSerializer, 
eventSharedBuffer, states, windowTime);
+               return Objects.hash(
+                       nonDuplicatingTypeSerializer,
+                       eventSharedBuffer,
+                       metaStates.states,
+                       metaStates.windowTime);
        }
 
        private static <T> boolean isEquivalentState(final State<T> s1, final 
State<T> s2) {
@@ -788,8 +794,8 @@ public boolean apply(@Nullable State<T> input) {
                        convertedStates.get(startName),
                        new DeweyNumber(this.startEventCounter)));
 
-               this.states.clear();
-               this.states.addAll(convertedStates.values());
+               this.metaStates = new MetaStates<T>(this.windowTime, 
this.handleTimeout);
+               this.metaStates.states.addAll(convertedStates.values());
 
                return computationStates;
        }
@@ -847,7 +853,493 @@ public int getVersion() {
        }
 
        /**
-        * A {@link TypeSerializer} for {@link NFA} that uses Java 
Serialization.
+        * A {@link TypeSerializer} for {@link NFA} that uses Flink's custom 
Serialization.
+        */
+       public static class NFASerializerV2<T> extends TypeSerializer<NFA<T>> {
+
+               private static final long serialVersionUID = 
2098282423980597010L;
+
+               private final TypeSerializer<SharedBuffer<String, T>> 
sharedBufferSerializer;
+
+               private final TypeSerializer<T> eventSerializer;
+
+               private final MetaStates<T> metaStates;
+
+               public NFASerializerV2(TypeSerializer<T> typeSerializer, 
MetaStates<T> metaStates) {
+                       this(
+                               typeSerializer,
+                               new 
SharedBuffer.SharedBufferSerializer<>(StringSerializer.INSTANCE, 
typeSerializer),
+                               metaStates);
+               }
+
+               public NFASerializerV2(
+                               TypeSerializer<T> typeSerializer,
+                               TypeSerializer<SharedBuffer<String, T>> 
sharedBufferSerializer,
+                               MetaStates<T> metaStates) {
+                       this.eventSerializer = typeSerializer;
+                       this.sharedBufferSerializer = sharedBufferSerializer;
+                       this.metaStates = metaStates;
+               }
+
+               @Override
+               public boolean isImmutableType() {
+                       return false;
+               }
+
+               @Override
+               public TypeSerializer<NFA<T>> duplicate() {
+                       return this;
+               }
+
+               @Override
+               public NFA<T> createInstance() {
+                       return null;
+               }
+
+               @Override
+               public NFA<T> copy(NFA<T> from) {
+                       try {
+                               ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+                               ObjectOutputStream oos = new 
ObjectOutputStream(baos);
+
+                               serialize(from, new 
DataOutputViewStreamWrapper(oos));
+
+                               oos.close();
+                               baos.close();
+
+                               byte[] data = baos.toByteArray();
+
+                               ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
+                               ObjectInputStream ois = new 
ObjectInputStream(bais);
+
+                               @SuppressWarnings("unchecked")
+                               NFA<T> copy = deserialize(new 
DataInputViewStreamWrapper(ois));
+                               ois.close();
+                               bais.close();
+                               return copy;
+                       } catch (IOException e) {
+                               throw new RuntimeException("Could not copy 
NFA.", e);
+                       }
+               }
+
+               @Override
+               public NFA<T> copy(NFA<T> from, NFA<T> reuse) {
+                       return copy(from);
+               }
+
+               @Override
+               public int getLength() {
+                       return -1;
+               }
+
+               @Override
+               public void serialize(NFA<T> record, DataOutputView target) 
throws IOException {
+                       
sharedBufferSerializer.serialize(record.eventSharedBuffer, target);
+
+                       target.writeInt(record.computationStates.size());
+
+                       StringSerializer stateNameSerializer = 
StringSerializer.INSTANCE;
+                       LongSerializer timestampSerializer = 
LongSerializer.INSTANCE;
+                       DeweyNumber.DeweyNumberSerializer versionSerializer = 
new DeweyNumber.DeweyNumberSerializer();
+
+                       for (ComputationState<T> computationState: 
record.computationStates) {
+                               
stateNameSerializer.serialize(computationState.getState().getName(), target);
+                               
stateNameSerializer.serialize(computationState.getPreviousState() == null
+                                               ? null : 
computationState.getPreviousState().getName(), target);
+
+                               
timestampSerializer.serialize(computationState.getTimestamp(), target);
+                               
versionSerializer.serialize(computationState.getVersion(), target);
+                               
timestampSerializer.serialize(computationState.getStartTimestamp(), target);
+                               target.writeInt(computationState.getCounter());
+
+                               if (computationState.getEvent() == null) {
+                                       target.writeBoolean(false);
+                               } else {
+                                       target.writeBoolean(true);
+                                       
eventSerializer.serialize(computationState.getEvent(), target);
+                               }
+                       }
+               }
+
+               @Override
+               public NFA<T> deserialize(DataInputView source) throws 
IOException {
+                       NFA<T> nfa = new NFA<>(eventSerializer, 
metaStates.windowTime, metaStates.handleTimeout);
+                       nfa.metaStates.states = metaStates.states;
+
+                       nfa.eventSharedBuffer = 
sharedBufferSerializer.deserialize(source);
+
+                       Queue<ComputationState<T>> computationStates = new 
LinkedList<>();
+                       StringSerializer stateNameSerializer = 
StringSerializer.INSTANCE;
+                       LongSerializer timestampSerializer = 
LongSerializer.INSTANCE;
+                       DeweyNumber.DeweyNumberSerializer versionSerializer = 
new DeweyNumber.DeweyNumberSerializer();
+
+                       int computationStateNo = source.readInt();
+                       for (int i = 0; i < computationStateNo; i++) {
+                               State<T> state = 
getStateByName(stateNameSerializer.deserialize(source), nfa);
+                               State<T> prevState = 
getStateByName(stateNameSerializer.deserialize(source), nfa);
+                               long timestamp = 
timestampSerializer.deserialize(source);
+                               DeweyNumber version = 
versionSerializer.deserialize(source);
+                               long startTimestamp = 
timestampSerializer.deserialize(source);
+                               int counter = source.readInt();
+
+                               T event = null;
+                               if (source.readBoolean()) {
+                                       event = 
eventSerializer.deserialize(source);
+                               }
+
+                               
computationStates.add(ComputationState.createState(
+                                               nfa, state, prevState, event, 
counter, timestamp, version, startTimestamp));
+                       }
+
+                       nfa.computationStates = computationStates;
+                       return nfa;
+               }
+
+               private State<T> getStateByName(String name, NFA<T> nfa) {
+                       for (State<T> state: nfa.metaStates.states) {
+                               if (state.getName().equals(name)) {
+                                       return state;
+                               }
+                       }
+                       return null;
+               }
+
+               @Override
+               public NFA<T> deserialize(NFA<T> reuse, DataInputView source) 
throws IOException {
+                       return deserialize(source);
+               }
+
+               @Override
+               public void copy(DataInputView source, DataOutputView target) 
throws IOException {
+                       SharedBuffer<String, T> sharedBuffer = 
sharedBufferSerializer.deserialize(source);
+                       sharedBufferSerializer.serialize(sharedBuffer, target);
+
+                       StringSerializer stateNameSerializer = 
StringSerializer.INSTANCE;
+                       LongSerializer timestampSerializer = 
LongSerializer.INSTANCE;
+                       DeweyNumber.DeweyNumberSerializer versionSerializer = 
new DeweyNumber.DeweyNumberSerializer();
+
+                       int computationStateNo = source.readInt();
+                       target.writeInt(computationStateNo);
+
+                       for (int i = 0; i < computationStateNo; i++) {
+                               String stateName = 
stateNameSerializer.deserialize(source);
+                               stateNameSerializer.serialize(stateName, 
target);
+
+                               String prevStateName = 
stateNameSerializer.deserialize(source);
+                               stateNameSerializer.serialize(prevStateName, 
target);
+
+                               long timestamp = 
timestampSerializer.deserialize(source);
+                               timestampSerializer.serialize(timestamp, 
target);
+
+                               DeweyNumber version = 
versionSerializer.deserialize(source);
+                               versionSerializer.serialize(version, target);
+
+                               long startTimestamp = 
timestampSerializer.deserialize(source);
+                               timestampSerializer.serialize(startTimestamp, 
target);
+
+                               int counter = source.readInt();
+                               target.writeInt(counter);
+
+                               boolean hasEvent = source.readBoolean();
+                               target.writeBoolean(hasEvent);
+                               if (hasEvent) {
+                                       T event = 
eventSerializer.deserialize(source);
+                                       eventSerializer.serialize(event, 
target);
+                               }
+                       }
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       return obj == this ||
+                                       (obj != null && 
obj.getClass().equals(getClass()) &&
+                                                       
sharedBufferSerializer.equals(((NFASerializerV2) obj).sharedBufferSerializer) &&
+                                                       
eventSerializer.equals(((NFASerializerV2) obj).eventSerializer));
+               }
+
+               @Override
+               public boolean canEqual(Object obj) {
+                       return true;
+               }
+
+               @Override
+               public int hashCode() {
+                       return 37 * sharedBufferSerializer.hashCode() + 
eventSerializer.hashCode();
+               }
+
+               @Override
+               public TypeSerializerConfigSnapshot snapshotConfiguration() {
+                       return new 
NFASerializerConfigSnapshot<>(eventSerializer, sharedBufferSerializer);
+               }
+
+               @Override
+               public CompatibilityResult<NFA<T>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+                       if (configSnapshot instanceof 
NFASerializerConfigSnapshot) {
+                               List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> serializersAndConfigs =
+                                               ((NFASerializerConfigSnapshot) 
configSnapshot).getNestedSerializersAndConfigs();
+
+                               CompatibilityResult<T> eventCompatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                                               serializersAndConfigs.get(0).f0,
+                                               
UnloadableDummyTypeSerializer.class,
+                                               serializersAndConfigs.get(0).f1,
+                                               eventSerializer);
+
+                               CompatibilityResult<SharedBuffer<String, T>> 
sharedBufCompatResult =
+                                               
CompatibilityUtil.resolveCompatibilityResult(
+                                                               
serializersAndConfigs.get(1).f0,
+                                                               
UnloadableDummyTypeSerializer.class,
+                                                               
serializersAndConfigs.get(1).f1,
+                                                               
sharedBufferSerializer);
+
+                               if 
(!sharedBufCompatResult.isRequiresMigration() && 
!eventCompatResult.isRequiresMigration()) {
+                                       return CompatibilityResult.compatible();
+                               } else {
+                                       if 
(eventCompatResult.getConvertDeserializer() != null &&
+                                                       
sharedBufCompatResult.getConvertDeserializer() != null) {
+                                               return 
CompatibilityResult.requiresMigration(
+                                                               new 
NFASerializerV2<>(
+                                                                               
new TypeDeserializerAdapter<>(eventCompatResult.getConvertDeserializer()),
+                                                                               
new TypeDeserializerAdapter<>(sharedBufCompatResult.getConvertDeserializer()),
+                                                                               
metaStates));
+                                       }
+                               }
+                       }
+
+                       return CompatibilityResult.requiresMigration();
+               }
+       }
+
+       public static class MetaStates<T> implements Serializable {
+
+               private static final long serialVersionUID = 1L;
+
+               MetaStates(final long windowTime, final boolean handleTimeout) {
+                       this(windowTime, handleTimeout, new 
HashSet<State<T>>());
+               }
+
+               MetaStates(final long windowTime, final boolean handleTimeout, 
final Set<State<T>> states) {
+                       this.windowTime = windowTime;
+                       this.handleTimeout = handleTimeout;
+                       this.states = states;
+               }
+
+               /**
+                * A set of all the valid NFA states, as returned by the
+                * {@link NFACompiler NFACompiler}.
+                * These are directly derived from the user-specified pattern.
+                */
+               private Set<State<T>> states;
+
+               /**
+                * The length of a windowed pattern, as specified using the
+                * {@link org.apache.flink.cep.pattern.Pattern#within(Time)}  
Pattern.within(Time)}
+                * method.
+                */
+               private final long windowTime;
+
+               /**
+                * A flag indicating if we want timed-out patterns (in case of 
windowed patterns)
+                * to be emitted ({@code true}), or silently discarded ({@code 
false}).
+                */
+               private final boolean handleTimeout;
+
+               public static class MetaStatesSerializer<T> extends 
TypeSerializerSingleton<MetaStates<T>> {
+
+                       @Override
+                       public boolean isImmutableType() {
+                               return false;
+                       }
+
+                       @Override
+                       public MetaStates<T> createInstance() {
+                               return null;
+                       }
+
+                       @Override
+                       public MetaStates<T> copy(MetaStates<T> from) {
+                               try {
+                                       ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+                                       ObjectOutputStream oos = new 
ObjectOutputStream(baos);
+
+                                       serialize(from, new 
DataOutputViewStreamWrapper(oos));
+
+                                       oos.close();
+                                       baos.close();
+
+                                       byte[] data = baos.toByteArray();
+
+                                       ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
+                                       ObjectInputStream ois = new 
ObjectInputStream(bais);
+
+                                       @SuppressWarnings("unchecked")
+                                       MetaStates<T> copy = deserialize(new 
DataInputViewStreamWrapper(ois));
+                                       ois.close();
+                                       bais.close();
+                                       return copy;
+                               } catch (IOException e) {
+                                       throw new RuntimeException("Could not 
copy NFA.", e);
+                               }
+                       }
+
+                       @Override
+                       public MetaStates<T> copy(MetaStates<T> from, 
MetaStates<T> reuse) {
+                               return copy(from);
+                       }
+
+                       @Override
+                       public int getLength() {
+                               return -1;
+                       }
+
+                       @Override
+                       public void serialize(MetaStates<T> record, 
DataOutputView target) throws IOException {
+                               serializeStates(record.states, target);
+                               target.writeLong(record.windowTime);
+                               target.writeBoolean(record.handleTimeout);
+                       }
+
+                       @Override
+                       public MetaStates<T> deserialize(DataInputView source) 
throws IOException {
+                               Set<State<T>> states = 
deserializeStates(source);
+                               long windowTime = source.readLong();
+                               boolean handleTimeout = source.readBoolean();
+                               return new MetaStates<>(windowTime, 
handleTimeout, states);
+                       }
+
+                       @Override
+                       public MetaStates<T> deserialize(MetaStates<T> reuse, 
DataInputView source) throws IOException {
+                               return deserialize(source);
+                       }
+
+                       @Override
+                       public void copy(DataInputView source, DataOutputView 
target) throws IOException {
+                               Set<State<T>> states = 
deserializeStates(source);
+                               serializeStates(states, target);
+
+                               long windowTime = source.readLong();
+                               target.writeLong(windowTime);
+
+                               boolean handleTimeout = source.readBoolean();
+                               target.writeBoolean(handleTimeout);
+                       }
+
+                       @Override
+                       public boolean canEqual(Object obj) {
+                               return true;
+                       }
+
+                       private void serializeStates(Set<State<T>> states, 
DataOutputView out) throws IOException {
+                               TypeSerializer<String> nameSerializer = 
StringSerializer.INSTANCE;
+                               TypeSerializer<State.StateType> 
stateTypeSerializer = new EnumSerializer<>(State.StateType.class);
+                               TypeSerializer<StateTransitionAction> 
actionSerializer = new EnumSerializer<>(StateTransitionAction.class);
+
+                               out.writeInt(states.size());
+                               for (State<T> state: states) {
+                                       
nameSerializer.serialize(state.getName(), out);
+                                       
stateTypeSerializer.serialize(state.getStateType(), out);
+                               }
+
+                               for (State<T> state: states) {
+                                       
nameSerializer.serialize(state.getName(), out);
+
+                                       
out.writeInt(state.getStateTransitions().size());
+                                       for (StateTransition<T> transition : 
state.getStateTransitions()) {
+                                               
nameSerializer.serialize(transition.getSourceState().getName(), out);
+                                               
nameSerializer.serialize(transition.getTargetState().getName(), out);
+                                               
actionSerializer.serialize(transition.getAction(), out);
+
+                                               
serializeCondition(transition.getCondition(), out);
+                                       }
+                               }
+                       }
+
+                       private Set<State<T>> deserializeStates(DataInputView 
in) throws IOException {
+                               TypeSerializer<String> nameSerializer = 
StringSerializer.INSTANCE;
+                               TypeSerializer<State.StateType> 
stateTypeSerializer = new EnumSerializer<>(State.StateType.class);
+                               TypeSerializer<StateTransitionAction> 
actionSerializer = new EnumSerializer<>(StateTransitionAction.class);
+
+                               final int noOfStates = in.readInt();
+                               Map<String, State<T>> states = new 
HashMap<>(noOfStates);
+
+                               for (int i = 0; i < noOfStates; i++) {
+                                       String stateName = 
nameSerializer.deserialize(in);
+                                       State.StateType stateType = 
stateTypeSerializer.deserialize(in);
+
+                                       State<T> state = new State<>(stateName, 
stateType);
+                                       states.put(stateName, state);
+                               }
+
+                               for (int i = 0; i < noOfStates; i++) {
+                                       String srcName = 
nameSerializer.deserialize(in);
+
+                                       int noOfTransitions = in.readInt();
+                                       for (int j = 0; j < noOfTransitions; 
j++) {
+                                               String src = 
nameSerializer.deserialize(in);
+                                               
Preconditions.checkState(src.equals(srcName),
+                                                                               
        "Source Edge names do not match (" + srcName + " - " + src + ").");
+
+                                               String trgt = 
nameSerializer.deserialize(in);
+                                               StateTransitionAction action = 
actionSerializer.deserialize(in);
+
+                                               IterativeCondition<T> condition 
= null;
+                                               try {
+                                                       condition = 
deserializeCondition(in);
+                                               } catch (ClassNotFoundException 
e) {
+                                                       e.printStackTrace();
+                                               }
+
+                                               State<T> srcState = 
states.get(src);
+                                               State<T> trgtState = 
states.get(trgt);
+                                               
srcState.addStateTransition(action, trgtState, condition);
+                                       }
+
+                               }
+                               return new HashSet<>(states.values());
+                       }
+
+                       private void serializeCondition(IterativeCondition<T> 
condition, DataOutputView out) throws IOException {
+                               out.writeBoolean(condition != null);
+                               if (condition != null) {
+                                       ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+                                       ObjectOutputStream oos = new 
ObjectOutputStream(baos);
+
+                                       oos.writeObject(condition);
+
+                                       oos.close();
+                                       baos.close();
+
+                                       byte[] serCondition = 
baos.toByteArray();
+                                       out.writeInt(serCondition.length);
+                                       out.write(serCondition);
+                               }
+                       }
+
+                       private IterativeCondition<T> 
deserializeCondition(DataInputView in) throws IOException, 
ClassNotFoundException {
+                               boolean hasCondition = in.readBoolean();
+                               if (hasCondition) {
+                                       int length = in.readInt();
+
+                                       byte[] serCondition = new byte[length];
+                                       in.readFully(serCondition);
+
+                                       ByteArrayInputStream bais = new 
ByteArrayInputStream(serCondition);
+                                       ObjectInputStream ois = new 
ObjectInputStream(bais);
+
+                                       IterativeCondition<T> condition = 
(IterativeCondition<T>) ois.readObject();
+                                       ois.close();
+                                       bais.close();
+
+                                       return condition;
+                               }
+                               return null;
+                       }
+               }
+       }
+
+       //////////////////                      Old Serialization               
        //////////////////////
+
+       /**
+        * A {@link TypeSerializer} for {@link NFA} that uses Flink's custom 
Serialization.
         */
        public static class NFASerializer<T> extends TypeSerializer<NFA<T>> {
 
@@ -862,8 +1354,8 @@ public NFASerializer(TypeSerializer<T> typeSerializer) {
                }
 
                public NFASerializer(
-                               TypeSerializer<T> typeSerializer,
-                               TypeSerializer<SharedBuffer<String, T>> 
sharedBufferSerializer) {
+                       TypeSerializer<T> typeSerializer,
+                       TypeSerializer<SharedBuffer<String, T>> 
sharedBufferSerializer) {
                        this.eventSerializer = typeSerializer;
                        this.sharedBufferSerializer = sharedBufferSerializer;
                }
@@ -936,7 +1428,7 @@ public void serialize(NFA<T> record, DataOutputView 
target) throws IOException {
                        for (ComputationState<T> computationState: 
record.computationStates) {
                                
stateNameSerializer.serialize(computationState.getState().getName(), target);
                                
stateNameSerializer.serialize(computationState.getPreviousState() == null
-                                               ? null : 
computationState.getPreviousState().getName(), target);
+                                                                     ? null : 
computationState.getPreviousState().getName(), target);
 
                                
timestampSerializer.serialize(computationState.getTimestamp(), target);
                                
versionSerializer.serialize(computationState.getVersion(), target);
@@ -983,7 +1475,7 @@ public void serialize(NFA<T> record, DataOutputView 
target) throws IOException {
                                }
 
                                
computationStates.add(ComputationState.createState(
-                                               nfa, state, prevState, event, 
counter, timestamp, version, startTimestamp));
+                                       nfa, state, prevState, event, counter, 
timestamp, version, startTimestamp));
                        }
 
                        nfa.computationStates = computationStates;
@@ -1056,9 +1548,9 @@ public void copy(DataInputView source, DataOutputView 
target) throws IOException
                @Override
                public boolean equals(Object obj) {
                        return obj == this ||
-                                       (obj != null && 
obj.getClass().equals(getClass()) &&
-                                                       
sharedBufferSerializer.equals(((NFASerializer) obj).sharedBufferSerializer) &&
-                                                       
eventSerializer.equals(((NFASerializer) obj).eventSerializer));
+                              (obj != null && 
obj.getClass().equals(getClass()) &&
+                               sharedBufferSerializer.equals(((NFASerializer) 
obj).sharedBufferSerializer) &&
+                               eventSerializer.equals(((NFASerializer) 
obj).eventSerializer));
                }
 
                @Override
@@ -1080,30 +1572,30 @@ public TypeSerializerConfigSnapshot 
snapshotConfiguration() {
                public CompatibilityResult<NFA<T>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
                        if (configSnapshot instanceof 
NFASerializerConfigSnapshot) {
                                List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> serializersAndConfigs =
-                                               ((NFASerializerConfigSnapshot) 
configSnapshot).getNestedSerializersAndConfigs();
+                                       ((NFASerializerConfigSnapshot) 
configSnapshot).getNestedSerializersAndConfigs();
 
                                CompatibilityResult<T> eventCompatResult = 
CompatibilityUtil.resolveCompatibilityResult(
-                                               serializersAndConfigs.get(0).f0,
-                                               
UnloadableDummyTypeSerializer.class,
-                                               serializersAndConfigs.get(0).f1,
-                                               eventSerializer);
+                                       serializersAndConfigs.get(0).f0,
+                                       UnloadableDummyTypeSerializer.class,
+                                       serializersAndConfigs.get(0).f1,
+                                       eventSerializer);
 
                                CompatibilityResult<SharedBuffer<String, T>> 
sharedBufCompatResult =
-                                               
CompatibilityUtil.resolveCompatibilityResult(
-                                                               
serializersAndConfigs.get(1).f0,
-                                                               
UnloadableDummyTypeSerializer.class,
-                                                               
serializersAndConfigs.get(1).f1,
-                                                               
sharedBufferSerializer);
+                                       
CompatibilityUtil.resolveCompatibilityResult(
+                                               serializersAndConfigs.get(1).f0,
+                                               
UnloadableDummyTypeSerializer.class,
+                                               serializersAndConfigs.get(1).f1,
+                                               sharedBufferSerializer);
 
                                if 
(!sharedBufCompatResult.isRequiresMigration() && 
!eventCompatResult.isRequiresMigration()) {
                                        return CompatibilityResult.compatible();
                                } else {
                                        if 
(eventCompatResult.getConvertDeserializer() != null &&
-                                                       
sharedBufCompatResult.getConvertDeserializer() != null) {
+                                           
sharedBufCompatResult.getConvertDeserializer() != null) {
                                                return 
CompatibilityResult.requiresMigration(
-                                                               new 
NFASerializer<>(
-                                                                               
new TypeDeserializerAdapter<>(eventCompatResult.getConvertDeserializer()),
-                                                                               
new TypeDeserializerAdapter<>(sharedBufCompatResult.getConvertDeserializer())));
+                                                       new NFASerializer<>(
+                                                               new 
TypeDeserializerAdapter<>(eventCompatResult.getConvertDeserializer()),
+                                                               new 
TypeDeserializerAdapter<>(sharedBufCompatResult.getConvertDeserializer())));
                                        }
                                }
                        }
@@ -1159,7 +1651,7 @@ private void serializeStates(Set<State<T>> states, 
DataOutputView out) throws IO
                                for (int j = 0; j < noOfTransitions; j++) {
                                        String src = 
nameSerializer.deserialize(in);
                                        
Preconditions.checkState(src.equals(srcName),
-                                                       "Source Edge names do 
not match (" + srcName + " - " + src + ").");
+                                                                "Source Edge 
names do not match (" + srcName + " - " + src + ").");
 
                                        String trgt = 
nameSerializer.deserialize(in);
                                        StateTransitionAction action = 
actionSerializer.deserialize(in);
@@ -1218,8 +1710,6 @@ private void serializeCondition(IterativeCondition<T> 
condition, DataOutputView
                }
        }
 
-       //////////////////                      Old Serialization               
        //////////////////////
-
        /**
         * A {@link TypeSerializer} for {@link NFA} that uses Java 
Serialization.
         */
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 2e3aefd0dc7..86a2be552e4 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -19,6 +19,8 @@
 package org.apache.flink.cep.operator;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
@@ -94,9 +96,11 @@
        ///////////////                 State                   //////////////
 
        private static final String NFA_OPERATOR_STATE_NAME = 
"nfaOperatorStateName";
+       private static final String NFA_METASTATES_OPERATOR_STATE_NAME = 
"nfaMetaStatesOperatorStateName";
        private static final String EVENT_QUEUE_STATE_NAME = 
"eventQueuesStateName";
 
        private transient ValueState<NFA<IN>> nfaOperatorState;
+       private transient ListState<NFA.MetaStates<IN>> 
nfaMetaStatesOperatorState;
        private transient MapState<Long, List<IN>> elementQueueState;
 
        private final NFACompiler.NFAFactory<IN> nfaFactory;
@@ -134,11 +138,24 @@ public AbstractKeyedCEPPatternOperator(
        public void initializeState(StateInitializationContext context) throws 
Exception {
                super.initializeState(context);
 
+               if (nfaMetaStatesOperatorState == null) {
+                       nfaMetaStatesOperatorState = 
getOperatorStateBackend().getUnionListState(
+                               new ListStateDescriptor<> (
+                                       NFA_METASTATES_OPERATOR_STATE_NAME,
+                                       new 
NFA.MetaStates.MetaStatesSerializer<IN>()));
+               }
+
                if (nfaOperatorState == null) {
+                       NFA.MetaStates<IN> metaStates = getNFAMetaStates();
+                       if (metaStates == null) {
+                               metaStates = 
nfaFactory.createNFA().getMetaStates();
+                               updateNFAMetaStates(metaStates);
+                       }
+
                        nfaOperatorState = getRuntimeContext().getState(
                                new ValueStateDescriptor<>(
-                                               NFA_OPERATOR_STATE_NAME,
-                                               new 
NFA.NFASerializer<>(inputSerializer)));
+                                       NFA_OPERATOR_STATE_NAME,
+                                       new 
NFA.NFASerializerV2<>(inputSerializer, metaStates)));
                }
 
                if (elementQueueState == null) {
@@ -264,6 +281,20 @@ private void updateLastSeenWatermark(long timestamp) {
                this.lastWatermark = timestamp;
        }
 
+       private NFA.MetaStates<IN> getNFAMetaStates() throws Exception {
+               Iterable<NFA.MetaStates<IN>> listState = 
nfaMetaStatesOperatorState.get();
+               if (listState != null && listState.iterator().hasNext()) {
+                       return listState.iterator().next();
+               } else {
+                       return null;
+               }
+       }
+
+       private void updateNFAMetaStates(NFA.MetaStates<IN> metaStates) throws 
Exception {
+               nfaMetaStatesOperatorState.clear();
+               nfaMetaStatesOperatorState.add(metaStates);
+       }
+
        private NFA<IN> getNFA() throws IOException {
                NFA<IN> nfa = nfaOperatorState.value();
                return nfa != null ? nfa : nfaFactory.createNFA();
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 25863423fbc..ace97d6ce88 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -299,7 +299,8 @@ public boolean filter(Event value) throws Exception {
                        nfa.process(d, 7);
                        nfa.process(a, 8);
 
-                       NFA.NFASerializer<Event> serializer = new 
NFA.NFASerializer<>(Event.createTypeSerializer());
+                       NFA.NFASerializerV2<Event> serializer =
+                               new 
NFA.NFASerializerV2<>(Event.createTypeSerializer(), nfa.getMetaStates());
 
                        //serialize
                        ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
@@ -307,7 +308,8 @@ public boolean filter(Event value) throws Exception {
                        baos.close();
 
                        // copy
-                       NFA.NFASerializer<Event> copySerializer = new 
NFA.NFASerializer<>(Event.createTypeSerializer());
+                       NFA.NFASerializerV2<Event> copySerializer =
+                               new 
NFA.NFASerializerV2<>(Event.createTypeSerializer(), nfa.getMetaStates());
                        ByteArrayInputStream in = new 
ByteArrayInputStream(baos.toByteArray());
                        ByteArrayOutputStream out = new ByteArrayOutputStream();
                        copySerializer.copy(new DataInputViewStreamWrapper(in), 
new DataOutputViewStreamWrapper(out));
@@ -316,7 +318,8 @@ public boolean filter(Event value) throws Exception {
 
                        // deserialize
                        ByteArrayInputStream bais = new 
ByteArrayInputStream(out.toByteArray());
-                       NFA.NFASerializer<Event> deserializer = new 
NFA.NFASerializer<>(Event.createTypeSerializer());
+                       NFA.NFASerializerV2<Event> deserializer =
+                               new 
NFA.NFASerializerV2<>(Event.createTypeSerializer(), nfa.getMetaStates());
                        NFA<Event> copy = deserializer.deserialize(new 
DataInputViewStreamWrapper(bais));
                        bais.close();
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to