This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f522e65cce7c53128f77f38540b762a5ba5ac905
Author: Hangxiang Yu <master...@gmail.com>
AuthorDate: Mon Jan 23 17:07:26 2023 +0800

    [FLINK-30613][serializer] Migrate NFAStateSerializerSnapshot to implement 
new method of resolving schema compatibility
---
 .../org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java     | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java
index 0947315cea0..a62071a847f 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep.nfa;
 
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.cep.nfa.sharedbuffer.EventId;
 import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
 import org.apache.flink.core.memory.DataInputView;
@@ -71,8 +72,12 @@ public class NFAStateSerializerSnapshot
 
     @Override
     protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(
-            NFAStateSerializer newSerializer) {
-        if (supportsPreviousTimestamp != 
newSerializer.isSupportsPreviousTimestamp()) {
+            TypeSerializerSnapshot<NFAState> oldSerializerSnapshot) {
+        if (!(oldSerializerSnapshot instanceof NFAStateSerializerSnapshot)) {
+            return OuterSchemaCompatibility.INCOMPATIBLE;
+        }
+        if (supportsPreviousTimestamp
+                != ((NFAStateSerializerSnapshot) 
oldSerializerSnapshot).supportsPreviousTimestamp) {
             return OuterSchemaCompatibility.COMPATIBLE_AFTER_MIGRATION;
         }
 

Reply via email to