This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 904fac2e0149318bf2d1415edf8a4775f96fa184
Author: Hangxiang Yu <master...@gmail.com>
AuthorDate: Tue Jun 6 16:40:17 2023 +0800

    [FLINK-30613][serializer] Remove useless correspondingSerializerClass of 
CompositeTypeSerializerSnapshot
---
 .../typeutils/CompositeTypeSerializerSnapshot.java       | 16 +++++++---------
 .../SingleThreadAccessCheckingTypeSerializer.java        |  8 ++------
 .../typeutils/base/GenericArraySerializerSnapshot.java   |  5 +----
 .../common/typeutils/base/ListSerializerSnapshot.java    |  4 +---
 .../api/common/typeutils/base/MapSerializerSnapshot.java |  4 +---
 .../typeutils/runtime/JavaEitherSerializerSnapshot.java  |  5 +----
 .../api/java/typeutils/runtime/NullableSerializer.java   |  5 +----
 .../flink/api/java/typeutils/runtime/RowSerializer.java  |  4 +---
 .../java/typeutils/runtime/TupleSerializerSnapshot.java  |  5 +----
 .../api/common/typeutils/CompositeSerializerTest.java    |  1 -
 .../typeutils/CompositeTypeSerializerSnapshotTest.java   |  4 +---
 .../apache/flink/streaming/tests/verify/ValueWithTs.java |  4 +---
 .../src/main/java/org/apache/flink/cep/nfa/NFA.java      |  4 +---
 .../apache/flink/cep/nfa/NFAStateSerializerSnapshot.java |  4 +---
 .../main/java/org/apache/flink/cep/nfa/SharedBuffer.java |  4 +---
 .../nfa/sharedbuffer/LockableTypeSerializerSnapshot.java |  4 +---
 .../org/apache/flink/cep/nfa/sharedbuffer/NodeId.java    |  4 +---
 .../flink/cep/nfa/sharedbuffer/SharedBufferEdge.java     |  4 +---
 .../flink/cep/nfa/sharedbuffer/SharedBufferNode.java     |  4 +---
 .../SharedBufferNodeSerializerSnapshotV2.java            |  4 +---
 .../flink/runtime/state/ArrayListSerializerSnapshot.java |  4 +---
 .../apache/flink/runtime/state/ttl/TtlStateFactory.java  |  4 +---
 .../typeutils/ScalaCaseClassSerializerSnapshot.java      |  4 +---
 .../scala/typeutils/ScalaEitherSerializerSnapshot.java   |  4 +---
 .../scala/typeutils/ScalaOptionSerializerSnapshot.java   |  4 +---
 .../api/scala/typeutils/ScalaTrySerializerSnapshot.java  |  4 +---
 .../scala/typeutils/TraversableSerializerSnapshot.java   |  5 +----
 .../typeutils/Tuple2CaseClassSerializerSnapshot.java     |  9 +--------
 .../flink/streaming/api/datastream/CoGroupedStreams.java |  4 +---
 .../api/functions/sink/TwoPhaseCommitSinkFunction.java   |  4 +---
 .../streaming/api/operators/TimerSerializerSnapshot.java |  4 +---
 .../streaming/api/operators/co/IntervalJoinOperator.java |  4 +---
 .../runtime/streamrecord/StreamElementSerializer.java    |  4 +---
 .../flink/table/dataview/ListViewSerializerSnapshot.java |  4 +---
 .../flink/table/dataview/MapViewSerializerSnapshot.java  |  4 +---
 .../table/dataview/NullAwareMapSerializerSnapshot.java   |  4 +---
 .../table/runtime/typeutils/ExternalSerializer.java      |  4 +---
 .../table/runtime/typeutils/LinkedListSerializer.java    |  4 +---
 .../table/runtime/typeutils/RawValueDataSerializer.java  |  4 +---
 .../table/runtime/typeutils/WindowKeySerializer.java     |  4 +---
 40 files changed, 46 insertions(+), 137 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
index 3de2b46d006..f96acd6af0c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
@@ -28,7 +28,6 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 import java.io.IOException;
-import java.util.Arrays;
 
 import static 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil.IntermediateCompatibilityResult;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -123,30 +122,29 @@ public abstract class CompositeTypeSerializerSnapshot<T, 
S extends TypeSerialize
 
     private NestedSerializersSnapshotDelegate 
nestedSerializersSnapshotDelegate;
 
-    private final Class<S> correspondingSerializerClass;
-
     /**
      * Constructor to be used for read instantiation.
      *
+     * @deprecated correspondingSerializerClass is not used to resolve and 
cast after FLIP-263,
+     *     please use {@link CompositeTypeSerializerSnapshot()} instead.
      * @param correspondingSerializerClass the expected class of the new 
serializer.
      */
-    @SuppressWarnings("unchecked")
+    @Deprecated
     public CompositeTypeSerializerSnapshot(
-            Class<? extends TypeSerializer> correspondingSerializerClass) {
-        this.correspondingSerializerClass = (Class<S>) 
checkNotNull(correspondingSerializerClass);
-    }
+            Class<? extends TypeSerializer> correspondingSerializerClass) {}
+
+    /** Constructor to be used for read instantiation. */
+    public CompositeTypeSerializerSnapshot() {}
 
     /**
      * Constructor to be used for writing the snapshot.
      *
      * @param serializerInstance an instance of the originating serializer of 
this snapshot.
      */
-    @SuppressWarnings("unchecked")
     public CompositeTypeSerializerSnapshot(S serializerInstance) {
         checkNotNull(serializerInstance);
         this.nestedSerializersSnapshotDelegate =
                 new 
NestedSerializersSnapshotDelegate(getNestedSerializers(serializerInstance));
-        this.correspondingSerializerClass = (Class<S>) 
serializerInstance.getClass();
     }
 
     @Override
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SingleThreadAccessCheckingTypeSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SingleThreadAccessCheckingTypeSerializer.java
index 9234cf9a0d1..6f03b595c31 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SingleThreadAccessCheckingTypeSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SingleThreadAccessCheckingTypeSerializer.java
@@ -151,12 +151,8 @@ public class SingleThreadAccessCheckingTypeSerializer<T> 
extends TypeSerializer<
             extends CompositeTypeSerializerSnapshot<
                     T, SingleThreadAccessCheckingTypeSerializer<T>> {
 
-        @SuppressWarnings({"unchecked", "unused"})
-        public SingleThreadAccessCheckingTypeSerializerSnapshot() {
-            super(
-                    (Class<SingleThreadAccessCheckingTypeSerializer<T>>)
-                            (Class<?>) 
SingleThreadAccessCheckingTypeSerializer.class);
-        }
+        @SuppressWarnings("unused")
+        public SingleThreadAccessCheckingTypeSerializerSnapshot() {}
 
         SingleThreadAccessCheckingTypeSerializerSnapshot(
                 SingleThreadAccessCheckingTypeSerializer<T> 
serializerInstance) {
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
index bdb6cef922c..6777aba2972 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
@@ -43,9 +43,7 @@ public final class GenericArraySerializerSnapshot<C>
     private Class<C> componentClass;
 
     /** Constructor to be used for read instantiation. */
-    public GenericArraySerializerSnapshot() {
-        super(GenericArraySerializer.class);
-    }
+    public GenericArraySerializerSnapshot() {}
 
     /** Constructor to be used for writing the snapshot. */
     public GenericArraySerializerSnapshot(GenericArraySerializer<C> 
genericArraySerializer) {
@@ -59,7 +57,6 @@ public final class GenericArraySerializerSnapshot<C>
      */
     @SuppressWarnings("deprecation")
     GenericArraySerializerSnapshot(Class<C> componentClass) {
-        super(GenericArraySerializer.class);
         this.componentClass = componentClass;
     }
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
index 85968cb9417..e474f57e868 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
@@ -30,9 +30,7 @@ public class ListSerializerSnapshot<T>
     private static final int CURRENT_VERSION = 1;
 
     /** Constructor for read instantiation. */
-    public ListSerializerSnapshot() {
-        super(ListSerializer.class);
-    }
+    public ListSerializerSnapshot() {}
 
     /** Constructor to create the snapshot for writing. */
     public ListSerializerSnapshot(ListSerializer<T> listSerializer) {
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
index a9afe28ded9..2d48cb6a7ab 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
@@ -31,9 +31,7 @@ public class MapSerializerSnapshot<K, V>
     private static final int CURRENT_VERSION = 1;
 
     /** Constructor for read instantiation. */
-    public MapSerializerSnapshot() {
-        super(MapSerializer.class);
-    }
+    public MapSerializerSnapshot() {}
 
     /** Constructor to create the snapshot for writing. */
     public MapSerializerSnapshot(MapSerializer<K, V> mapSerializer) {
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
index 2266cea6eec..e33ce8e6ac3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
@@ -34,10 +34,7 @@ public class JavaEitherSerializerSnapshot<L, R>
     private static final int CURRENT_VERSION = 1;
 
     /** Constructor for read instantiation. */
-    @SuppressWarnings("unused")
-    public JavaEitherSerializerSnapshot() {
-        super(EitherSerializer.class);
-    }
+    public JavaEitherSerializerSnapshot() {}
 
     /** Constructor to create the snapshot for writing. */
     public JavaEitherSerializerSnapshot(EitherSerializer<L, R> 
eitherSerializer) {
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
index 635b2f1323d..2517f101a4f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
@@ -271,9 +271,7 @@ public class NullableSerializer<T> extends 
TypeSerializer<T> {
         private int nullPaddingLength;
 
         @SuppressWarnings("unused")
-        public NullableSerializerSnapshot() {
-            super(NullableSerializer.class);
-        }
+        public NullableSerializerSnapshot() {}
 
         public NullableSerializerSnapshot(NullableSerializer<T> 
serializerInstance) {
             super(serializerInstance);
@@ -281,7 +279,6 @@ public class NullableSerializer<T> extends 
TypeSerializer<T> {
         }
 
         private NullableSerializerSnapshot(int nullPaddingLength) {
-            super(NullableSerializer.class);
             checkArgument(
                     nullPaddingLength >= 0,
                     "Computed NULL padding can not be negative. %s",
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
index e901b5e0cf3..cef69e27ed2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -485,9 +485,7 @@ public final class RowSerializer extends 
TypeSerializer<Row> {
 
         private boolean supportsRowKind = true;
 
-        public RowSerializerSnapshot() {
-            super(RowSerializer.class);
-        }
+        public RowSerializerSnapshot() {}
 
         RowSerializerSnapshot(RowSerializer serializerInstance) {
             super(serializerInstance);
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerSnapshot.java
index abaeaee805a..59406b18624 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerSnapshot.java
@@ -41,9 +41,7 @@ public final class TupleSerializerSnapshot<T extends Tuple>
     private Class<T> tupleClass;
 
     @SuppressWarnings("unused")
-    public TupleSerializerSnapshot() {
-        super(TupleSerializer.class);
-    }
+    public TupleSerializerSnapshot() {}
 
     TupleSerializerSnapshot(TupleSerializer<T> serializerInstance) {
         super(serializerInstance);
@@ -56,7 +54,6 @@ public final class TupleSerializerSnapshot<T extends Tuple>
      * 
TupleSerializer#resolveSchemaCompatibilityViaRedirectingToNewSnapshotClass}.
      */
     TupleSerializerSnapshot(Class<T> tupleClass) {
-        super(TupleSerializer.class);
         this.tupleClass = checkNotNull(tupleClass, "tuple class can not be 
NULL");
     }
 
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeSerializerTest.java
index b640e4d38c2..4169b89b9c8 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeSerializerTest.java
@@ -214,7 +214,6 @@ class CompositeSerializerTest {
 
         /** Constructor for read instantiation. */
         public TestListCompositeSerializerSnapshot() {
-            super(TestListCompositeSerializer.class);
             this.isImmutableTargetType = false;
         }
 
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
index d60c4f9713f..4b3b8fda623 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
@@ -367,9 +367,7 @@ public class CompositeTypeSerializerSnapshotTest {
 
         private OuterSchemaCompatibility mockOuterSchemaCompatibility;
 
-        public TestCompositeTypeSerializerSnapshot() {
-            super(TestCompositeTypeSerializer.class);
-        }
+        public TestCompositeTypeSerializerSnapshot() {}
 
         TestCompositeTypeSerializerSnapshot(TestCompositeTypeSerializer 
serializer) {
             super(serializer);
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
index 8e8b7d412fb..efdd6ff04b8 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
@@ -122,9 +122,7 @@ public class ValueWithTs<V> implements Serializable {
         private static final int VERSION = 2;
 
         @SuppressWarnings("unused")
-        public ValueWithTsSerializerSnapshot() {
-            super(Serializer.class);
-        }
+        public ValueWithTsSerializerSnapshot() {}
 
         ValueWithTsSerializerSnapshot(Serializer serializerInstance) {
             super(serializerInstance);
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index e5214def387..841d295286b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -984,9 +984,7 @@ public class NFA<T> {
 
         private static final int VERSION = 2;
 
-        public MigratedNFASerializerSnapshot() {
-            super(NFASerializer.class);
-        }
+        public MigratedNFASerializerSnapshot() {}
 
         MigratedNFASerializerSnapshot(NFASerializer<T> legacyNfaSerializer) {
             super(legacyNfaSerializer);
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java
index a62071a847f..fe532be318a 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java
@@ -39,9 +39,7 @@ public class NFAStateSerializerSnapshot
     private boolean supportsPreviousTimestamp = true;
 
     /** Constructor for read instantiation. */
-    public NFAStateSerializerSnapshot() {
-        super(NFAStateSerializer.class);
-    }
+    public NFAStateSerializerSnapshot() {}
 
     /** Constructor to create the snapshot for writing. */
     public NFAStateSerializerSnapshot(NFAStateSerializer serializerInstance) {
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index 2acbcf538d8..9285a51a881 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -159,9 +159,7 @@ public class SharedBuffer<V> {
 
         private static final int VERSION = 2;
 
-        public SharedBufferSerializerSnapshot() {
-            super(SharedBufferSerializer.class);
-        }
+        public SharedBufferSerializerSnapshot() {}
 
         public SharedBufferSerializerSnapshot(SharedBufferSerializer<K, V> 
sharedBufferSerializer) {
             super(sharedBufferSerializer);
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
index 5881a6fa971..d2a8dd4fd7f 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
@@ -31,9 +31,7 @@ public class LockableTypeSerializerSnapshot<E>
     private static final int CURRENT_VERSION = 1;
 
     /** Constructor for read instantiation. */
-    public LockableTypeSerializerSnapshot() {
-        super(Lockable.LockableTypeSerializer.class);
-    }
+    public LockableTypeSerializerSnapshot() {}
 
     /** Constructor to create the snapshot for writing. */
     public LockableTypeSerializerSnapshot(
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
index a4d3c180c56..f605e4f6236 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
@@ -167,9 +167,7 @@ public class NodeId {
 
             private static final int VERSION = 1;
 
-            public NodeIdSerializerSnapshot() {
-                super(NodeIdSerializer.class);
-            }
+            public NodeIdSerializerSnapshot() {}
 
             public NodeIdSerializerSnapshot(NodeIdSerializer nodeIdSerializer) 
{
                 super(nodeIdSerializer);
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
index 236623d5531..1fa02645fdc 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
@@ -170,9 +170,7 @@ public class SharedBufferEdge {
 
             private static final int VERSION = 1;
 
-            public SharedBufferEdgeSerializerSnapshot() {
-                super(SharedBufferEdgeSerializer.class);
-            }
+            public SharedBufferEdgeSerializerSnapshot() {}
 
             public SharedBufferEdgeSerializerSnapshot(
                     SharedBufferEdgeSerializer sharedBufferEdgeSerializer) {
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
index 9f7d97da4bd..83394964047 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
@@ -171,9 +171,7 @@ public class SharedBufferNode {
 
             private static final int VERSION = 1;
 
-            public SharedBufferNodeSerializerSnapshot() {
-                super(SharedBufferNodeSerializer.class);
-            }
+            public SharedBufferNodeSerializerSnapshot() {}
 
             public SharedBufferNodeSerializerSnapshot(
                     SharedBufferNodeSerializer sharedBufferNodeSerializer) {
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNodeSerializerSnapshotV2.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNodeSerializerSnapshotV2.java
index 9d963e829b9..06d6ca578b4 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNodeSerializerSnapshotV2.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNodeSerializerSnapshotV2.java
@@ -27,9 +27,7 @@ public final class SharedBufferNodeSerializerSnapshotV2
 
     private static final int VERSION = 1;
 
-    public SharedBufferNodeSerializerSnapshotV2() {
-        super(SharedBufferNodeSerializer.class);
-    }
+    public SharedBufferNodeSerializerSnapshotV2() {}
 
     public SharedBufferNodeSerializerSnapshotV2(
             SharedBufferNodeSerializer sharedBufferNodeSerializer) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
index 18c49333463..fb8ec70a362 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
@@ -30,9 +30,7 @@ public class ArrayListSerializerSnapshot<T>
     private static final int CURRENT_VERSION = 1;
 
     /** Constructor for read instantiation. */
-    public ArrayListSerializerSnapshot() {
-        super(ArrayListSerializer.class);
-    }
+    public ArrayListSerializerSnapshot() {}
 
     /** Constructor for creating the snapshot for writing. */
     public ArrayListSerializerSnapshot(ArrayListSerializer<T> 
arrayListSerializer) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
index 45eb2195cd8..df93d411472 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
@@ -349,9 +349,7 @@ public class TtlStateFactory<K, N, SV, TTLSV, S extends 
State, IS extends S> {
         private static final int VERSION = 2;
 
         @SuppressWarnings({"WeakerAccess", "unused"})
-        public TtlSerializerSnapshot() {
-            super(TtlSerializer.class);
-        }
+        public TtlSerializerSnapshot() {}
 
         TtlSerializerSnapshot(TtlSerializer<T> serializerInstance) {
             super(serializerInstance);
diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java
index 243d03bd44b..543e539f359 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java
@@ -43,9 +43,7 @@ public final class ScalaCaseClassSerializerSnapshot<T extends 
scala.Product>
 
     /** Used via reflection. */
     @SuppressWarnings("unused")
-    public ScalaCaseClassSerializerSnapshot() {
-        super(ScalaCaseClassSerializer.class);
-    }
+    public ScalaCaseClassSerializerSnapshot() {}
 
     /** Used for the snapshot path. */
     public ScalaCaseClassSerializerSnapshot(ScalaCaseClassSerializer<T> 
serializerInstance) {
diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
index dc88ea46a1b..a6cc00c315a 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
@@ -33,9 +33,7 @@ public class ScalaEitherSerializerSnapshot<L, R>
     private static final int CURRENT_VERSION = 1;
 
     /** Constructor for read instantiation. */
-    public ScalaEitherSerializerSnapshot() {
-        super(EitherSerializer.class);
-    }
+    public ScalaEitherSerializerSnapshot() {}
 
     /** Constructor to create the snapshot for writing. */
     public ScalaEitherSerializerSnapshot(EitherSerializer<L, R> 
eitherSerializer) {
diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
index faa4b3f8937..ab8f94654e1 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
@@ -31,9 +31,7 @@ public final class ScalaOptionSerializerSnapshot<E>
     private static final int VERSION = 2;
 
     @SuppressWarnings("WeakerAccess")
-    public ScalaOptionSerializerSnapshot() {
-        super(OptionSerializer.class);
-    }
+    public ScalaOptionSerializerSnapshot() {}
 
     public ScalaOptionSerializerSnapshot(OptionSerializer<E> 
serializerInstance) {
         super(serializerInstance);
diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerSnapshot.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerSnapshot.java
index 4a9e44b19e5..8c59e61a673 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerSnapshot.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerSnapshot.java
@@ -38,9 +38,7 @@ public class ScalaTrySerializerSnapshot<E>
 
     /** This empty nullary constructor is required for deserializing the 
configuration. */
     @SuppressWarnings("unused")
-    public ScalaTrySerializerSnapshot() {
-        super(TrySerializer.class);
-    }
+    public ScalaTrySerializerSnapshot() {}
 
     public ScalaTrySerializerSnapshot(TrySerializer<E> trySerializer) {
         super(trySerializer);
diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshot.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshot.java
index c7daa6c3ce0..23619b6f39c 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshot.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshot.java
@@ -47,9 +47,7 @@ public class TraversableSerializerSnapshot<T extends 
TraversableOnce<E>, E>
     private String cbfCode;
 
     @SuppressWarnings("unused")
-    public TraversableSerializerSnapshot() {
-        super(TraversableSerializer.class);
-    }
+    public TraversableSerializerSnapshot() {}
 
     public TraversableSerializerSnapshot(TraversableSerializer<T, E> 
serializerInstance) {
         super(serializerInstance);
@@ -57,7 +55,6 @@ public class TraversableSerializerSnapshot<T extends 
TraversableOnce<E>, E>
     }
 
     TraversableSerializerSnapshot(String cbfCode) {
-        super(TraversableSerializer.class);
         checkArgument(cbfCode != null, "cbfCode cannot be null");
 
         this.cbfCode = cbfCode;
diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/Tuple2CaseClassSerializerSnapshot.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/Tuple2CaseClassSerializerSnapshot.java
index c35a853f8b8..dc040d0f323 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/Tuple2CaseClassSerializerSnapshot.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/Tuple2CaseClassSerializerSnapshot.java
@@ -46,9 +46,7 @@ public final class Tuple2CaseClassSerializerSnapshot<T1, T2>
     private Class<Tuple2<T1, T2>> type;
 
     @SuppressWarnings("unused")
-    public Tuple2CaseClassSerializerSnapshot() {
-        super(correspondingSerializerClass());
-    }
+    public Tuple2CaseClassSerializerSnapshot() {}
 
     public Tuple2CaseClassSerializerSnapshot(
             ScalaCaseClassSerializer<Tuple2<T1, T2>> serializerInstance) {
@@ -102,9 +100,4 @@ public final class Tuple2CaseClassSerializerSnapshot<T1, T2>
                 ? OuterSchemaCompatibility.COMPATIBLE_AS_IS
                 : OuterSchemaCompatibility.INCOMPATIBLE;
     }
-
-    private static <T1, T2>
-            Class<ScalaCaseClassSerializer<scala.Tuple2<T1, T2>>> 
correspondingSerializerClass() {
-        return package$.MODULE$.tuple2ClassForJava();
-    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index b9a253d4e25..f86ad9f3ab8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -752,9 +752,7 @@ public class CoGroupedStreams<T1, T2> {
         private static final int VERSION = 2;
 
         @SuppressWarnings("WeakerAccess")
-        public UnionSerializerSnapshot() {
-            super(UnionSerializer.class);
-        }
+        public UnionSerializerSnapshot() {}
 
         UnionSerializerSnapshot(UnionSerializer<T1, T2> serializerInstance) {
             super(serializerInstance);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 46849154096..bef0b4b120d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -890,9 +890,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT> extends RichS
         private boolean supportsNullTransaction = true;
 
         @SuppressWarnings("WeakerAccess")
-        public StateSerializerSnapshot() {
-            super(StateSerializer.class);
-        }
+        public StateSerializerSnapshot() {}
 
         StateSerializerSnapshot(StateSerializer<TXN, CONTEXT> 
serializerInstance) {
             super(serializerInstance);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java
index c85e7ca8d82..7a805611502 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java
@@ -30,9 +30,7 @@ public class TimerSerializerSnapshot<K, N>
 
     private static final int VERSION = 2;
 
-    public TimerSerializerSnapshot() {
-        super(TimerSerializer.class);
-    }
+    public TimerSerializerSnapshot() {}
 
     public TimerSerializerSnapshot(TimerSerializer<K, N> timerSerializer) {
         super(timerSerializer);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
index 33376faf858..614b26979e6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
@@ -513,9 +513,7 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
         private static final int VERSION = 2;
 
         @SuppressWarnings({"unused", "WeakerAccess"})
-        public BufferEntrySerializerSnapshot() {
-            super(BufferEntrySerializer.class);
-        }
+        public BufferEntrySerializerSnapshot() {}
 
         BufferEntrySerializerSnapshot(BufferEntrySerializer<T> 
serializerInstance) {
             super(serializerInstance);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index 4140c52a9ec..f06de1b8c26 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -278,9 +278,7 @@ public final class StreamElementSerializer<T> extends 
TypeSerializer<StreamEleme
         private static final int VERSION = 2;
 
         @SuppressWarnings("WeakerAccess")
-        public StreamElementSerializerSnapshot() {
-            super(StreamElementSerializer.class);
-        }
+        public StreamElementSerializerSnapshot() {}
 
         StreamElementSerializerSnapshot(StreamElementSerializer<T> 
serializerInstance) {
             super(serializerInstance);
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
index a240181f11f..539e6197f14 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
@@ -39,9 +39,7 @@ public final class ListViewSerializerSnapshot<T>
     private static final int CURRENT_VERSION = 1;
 
     /** Constructor for read instantiation. */
-    public ListViewSerializerSnapshot() {
-        super(ListViewSerializer.class);
-    }
+    public ListViewSerializerSnapshot() {}
 
     /** Constructor to create the snapshot for writing. */
     public ListViewSerializerSnapshot(ListViewSerializer<T> 
listViewSerializer) {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
index e4b7046e83f..a2b5bc3031d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
@@ -40,9 +40,7 @@ public class MapViewSerializerSnapshot<K, V>
     private static final int CURRENT_VERSION = 1;
 
     /** Constructor for read instantiation. */
-    public MapViewSerializerSnapshot() {
-        super(MapViewSerializer.class);
-    }
+    public MapViewSerializerSnapshot() {}
 
     /** Constructor to create the snapshot for writing. */
     public MapViewSerializerSnapshot(MapViewSerializer<K, V> 
mapViewSerializer) {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullAwareMapSerializerSnapshot.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullAwareMapSerializerSnapshot.java
index 196b94800ad..7780f750007 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullAwareMapSerializerSnapshot.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullAwareMapSerializerSnapshot.java
@@ -38,9 +38,7 @@ public class NullAwareMapSerializerSnapshot<K, V>
     private static final int CURRENT_VERSION = 1;
 
     /** Constructor for read instantiation. */
-    public NullAwareMapSerializerSnapshot() {
-        super(NullAwareMapSerializer.class);
-    }
+    public NullAwareMapSerializerSnapshot() {}
 
     /** Constructor to create the snapshot for writing. */
     public NullAwareMapSerializerSnapshot(NullAwareMapSerializer<K, V> 
mapViewSerializer) {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java
index d9e7caabff2..4c712c25414 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java
@@ -248,9 +248,7 @@ public final class ExternalSerializer<I, E> extends 
TypeSerializer<E> {
 
         private boolean isInternalInput;
 
-        public ExternalSerializerSnapshot() {
-            super(ExternalSerializer.class);
-        }
+        public ExternalSerializerSnapshot() {}
 
         public ExternalSerializerSnapshot(ExternalSerializer<I, E> 
externalSerializer) {
             super(externalSerializer);
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializer.java
index 59498a5fa6e..0720e20c8fa 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializer.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializer.java
@@ -223,9 +223,7 @@ public final class LinkedListSerializer<T> extends 
TypeSerializer<LinkedList<T>>
         private boolean hasNullMask = true;
 
         /** Constructor for read instantiation. */
-        public LinkedListSerializerSnapshot() {
-            super(LinkedListSerializer.class);
-        }
+        public LinkedListSerializerSnapshot() {}
 
         /** Constructor to create the snapshot for writing. */
         public LinkedListSerializerSnapshot(LinkedListSerializer<T> 
listSerializer) {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializer.java
index 3ee21dfdb9b..6fda57c7411 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializer.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializer.java
@@ -149,9 +149,7 @@ public final class RawValueDataSerializer<T> extends 
TypeSerializer<RawValueData
             extends CompositeTypeSerializerSnapshot<RawValueData<T>, 
RawValueDataSerializer<T>> {
 
         @SuppressWarnings("unused")
-        public RawValueDataSerializerSnapshot() {
-            super(RawValueDataSerializer.class);
-        }
+        public RawValueDataSerializerSnapshot() {}
 
         public RawValueDataSerializerSnapshot(RawValueDataSerializer<T> 
serializerInstance) {
             super(serializerInstance);
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/WindowKeySerializer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/WindowKeySerializer.java
index fa19a393842..b4175cc7157 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/WindowKeySerializer.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/WindowKeySerializer.java
@@ -177,9 +177,7 @@ public class WindowKeySerializer extends 
PagedTypeSerializer<WindowKey> {
         private static final int CURRENT_VERSION = 1;
 
         /** This empty nullary constructor is required for deserializing the 
configuration. */
-        public WindowKeySerializerSnapshot() {
-            super(WindowKeySerializer.class);
-        }
+        public WindowKeySerializerSnapshot() {}
 
         public WindowKeySerializerSnapshot(WindowKeySerializer 
serializerInstance) {
             super(serializerInstance);


Reply via email to