This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 904fac2e0149318bf2d1415edf8a4775f96fa184 Author: Hangxiang Yu <master...@gmail.com> AuthorDate: Tue Jun 6 16:40:17 2023 +0800 [FLINK-30613][serializer] Remove useless correspondingSerializerClass of CompositeTypeSerializerSnapshot --- .../typeutils/CompositeTypeSerializerSnapshot.java | 16 +++++++--------- .../SingleThreadAccessCheckingTypeSerializer.java | 8 ++------ .../typeutils/base/GenericArraySerializerSnapshot.java | 5 +---- .../common/typeutils/base/ListSerializerSnapshot.java | 4 +--- .../api/common/typeutils/base/MapSerializerSnapshot.java | 4 +--- .../typeutils/runtime/JavaEitherSerializerSnapshot.java | 5 +---- .../api/java/typeutils/runtime/NullableSerializer.java | 5 +---- .../flink/api/java/typeutils/runtime/RowSerializer.java | 4 +--- .../java/typeutils/runtime/TupleSerializerSnapshot.java | 5 +---- .../api/common/typeutils/CompositeSerializerTest.java | 1 - .../typeutils/CompositeTypeSerializerSnapshotTest.java | 4 +--- .../apache/flink/streaming/tests/verify/ValueWithTs.java | 4 +--- .../src/main/java/org/apache/flink/cep/nfa/NFA.java | 4 +--- .../apache/flink/cep/nfa/NFAStateSerializerSnapshot.java | 4 +--- .../main/java/org/apache/flink/cep/nfa/SharedBuffer.java | 4 +--- .../nfa/sharedbuffer/LockableTypeSerializerSnapshot.java | 4 +--- .../org/apache/flink/cep/nfa/sharedbuffer/NodeId.java | 4 +--- .../flink/cep/nfa/sharedbuffer/SharedBufferEdge.java | 4 +--- .../flink/cep/nfa/sharedbuffer/SharedBufferNode.java | 4 +--- .../SharedBufferNodeSerializerSnapshotV2.java | 4 +--- .../flink/runtime/state/ArrayListSerializerSnapshot.java | 4 +--- .../apache/flink/runtime/state/ttl/TtlStateFactory.java | 4 +--- .../typeutils/ScalaCaseClassSerializerSnapshot.java | 4 +--- .../scala/typeutils/ScalaEitherSerializerSnapshot.java | 4 +--- .../scala/typeutils/ScalaOptionSerializerSnapshot.java | 4 +--- .../api/scala/typeutils/ScalaTrySerializerSnapshot.java | 4 +--- .../scala/typeutils/TraversableSerializerSnapshot.java | 5 +---- .../typeutils/Tuple2CaseClassSerializerSnapshot.java | 9 +-------- .../flink/streaming/api/datastream/CoGroupedStreams.java | 4 +--- .../api/functions/sink/TwoPhaseCommitSinkFunction.java | 4 +--- .../streaming/api/operators/TimerSerializerSnapshot.java | 4 +--- .../streaming/api/operators/co/IntervalJoinOperator.java | 4 +--- .../runtime/streamrecord/StreamElementSerializer.java | 4 +--- .../flink/table/dataview/ListViewSerializerSnapshot.java | 4 +--- .../flink/table/dataview/MapViewSerializerSnapshot.java | 4 +--- .../table/dataview/NullAwareMapSerializerSnapshot.java | 4 +--- .../table/runtime/typeutils/ExternalSerializer.java | 4 +--- .../table/runtime/typeutils/LinkedListSerializer.java | 4 +--- .../table/runtime/typeutils/RawValueDataSerializer.java | 4 +--- .../table/runtime/typeutils/WindowKeySerializer.java | 4 +--- 40 files changed, 46 insertions(+), 137 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java index 3de2b46d006..f96acd6af0c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java @@ -28,7 +28,6 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import java.io.IOException; -import java.util.Arrays; import static org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil.IntermediateCompatibilityResult; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -123,30 +122,29 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize private NestedSerializersSnapshotDelegate nestedSerializersSnapshotDelegate; - private final Class<S> correspondingSerializerClass; - /** * Constructor to be used for read instantiation. * + * @deprecated correspondingSerializerClass is not used to resolve and cast after FLIP-263, + * please use {@link CompositeTypeSerializerSnapshot()} instead. * @param correspondingSerializerClass the expected class of the new serializer. */ - @SuppressWarnings("unchecked") + @Deprecated public CompositeTypeSerializerSnapshot( - Class<? extends TypeSerializer> correspondingSerializerClass) { - this.correspondingSerializerClass = (Class<S>) checkNotNull(correspondingSerializerClass); - } + Class<? extends TypeSerializer> correspondingSerializerClass) {} + + /** Constructor to be used for read instantiation. */ + public CompositeTypeSerializerSnapshot() {} /** * Constructor to be used for writing the snapshot. * * @param serializerInstance an instance of the originating serializer of this snapshot. */ - @SuppressWarnings("unchecked") public CompositeTypeSerializerSnapshot(S serializerInstance) { checkNotNull(serializerInstance); this.nestedSerializersSnapshotDelegate = new NestedSerializersSnapshotDelegate(getNestedSerializers(serializerInstance)); - this.correspondingSerializerClass = (Class<S>) serializerInstance.getClass(); } @Override diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SingleThreadAccessCheckingTypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SingleThreadAccessCheckingTypeSerializer.java index 9234cf9a0d1..6f03b595c31 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SingleThreadAccessCheckingTypeSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SingleThreadAccessCheckingTypeSerializer.java @@ -151,12 +151,8 @@ public class SingleThreadAccessCheckingTypeSerializer<T> extends TypeSerializer< extends CompositeTypeSerializerSnapshot< T, SingleThreadAccessCheckingTypeSerializer<T>> { - @SuppressWarnings({"unchecked", "unused"}) - public SingleThreadAccessCheckingTypeSerializerSnapshot() { - super( - (Class<SingleThreadAccessCheckingTypeSerializer<T>>) - (Class<?>) SingleThreadAccessCheckingTypeSerializer.class); - } + @SuppressWarnings("unused") + public SingleThreadAccessCheckingTypeSerializerSnapshot() {} SingleThreadAccessCheckingTypeSerializerSnapshot( SingleThreadAccessCheckingTypeSerializer<T> serializerInstance) { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java index bdb6cef922c..6777aba2972 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java @@ -43,9 +43,7 @@ public final class GenericArraySerializerSnapshot<C> private Class<C> componentClass; /** Constructor to be used for read instantiation. */ - public GenericArraySerializerSnapshot() { - super(GenericArraySerializer.class); - } + public GenericArraySerializerSnapshot() {} /** Constructor to be used for writing the snapshot. */ public GenericArraySerializerSnapshot(GenericArraySerializer<C> genericArraySerializer) { @@ -59,7 +57,6 @@ public final class GenericArraySerializerSnapshot<C> */ @SuppressWarnings("deprecation") GenericArraySerializerSnapshot(Class<C> componentClass) { - super(GenericArraySerializer.class); this.componentClass = componentClass; } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java index 85968cb9417..e474f57e868 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java @@ -30,9 +30,7 @@ public class ListSerializerSnapshot<T> private static final int CURRENT_VERSION = 1; /** Constructor for read instantiation. */ - public ListSerializerSnapshot() { - super(ListSerializer.class); - } + public ListSerializerSnapshot() {} /** Constructor to create the snapshot for writing. */ public ListSerializerSnapshot(ListSerializer<T> listSerializer) { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java index a9afe28ded9..2d48cb6a7ab 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java @@ -31,9 +31,7 @@ public class MapSerializerSnapshot<K, V> private static final int CURRENT_VERSION = 1; /** Constructor for read instantiation. */ - public MapSerializerSnapshot() { - super(MapSerializer.class); - } + public MapSerializerSnapshot() {} /** Constructor to create the snapshot for writing. */ public MapSerializerSnapshot(MapSerializer<K, V> mapSerializer) { diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java index 2266cea6eec..e33ce8e6ac3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java @@ -34,10 +34,7 @@ public class JavaEitherSerializerSnapshot<L, R> private static final int CURRENT_VERSION = 1; /** Constructor for read instantiation. */ - @SuppressWarnings("unused") - public JavaEitherSerializerSnapshot() { - super(EitherSerializer.class); - } + public JavaEitherSerializerSnapshot() {} /** Constructor to create the snapshot for writing. */ public JavaEitherSerializerSnapshot(EitherSerializer<L, R> eitherSerializer) { diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java index 635b2f1323d..2517f101a4f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java @@ -271,9 +271,7 @@ public class NullableSerializer<T> extends TypeSerializer<T> { private int nullPaddingLength; @SuppressWarnings("unused") - public NullableSerializerSnapshot() { - super(NullableSerializer.class); - } + public NullableSerializerSnapshot() {} public NullableSerializerSnapshot(NullableSerializer<T> serializerInstance) { super(serializerInstance); @@ -281,7 +279,6 @@ public class NullableSerializer<T> extends TypeSerializer<T> { } private NullableSerializerSnapshot(int nullPaddingLength) { - super(NullableSerializer.class); checkArgument( nullPaddingLength >= 0, "Computed NULL padding can not be negative. %s", diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java index e901b5e0cf3..cef69e27ed2 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java @@ -485,9 +485,7 @@ public final class RowSerializer extends TypeSerializer<Row> { private boolean supportsRowKind = true; - public RowSerializerSnapshot() { - super(RowSerializer.class); - } + public RowSerializerSnapshot() {} RowSerializerSnapshot(RowSerializer serializerInstance) { super(serializerInstance); diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerSnapshot.java index abaeaee805a..59406b18624 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerSnapshot.java @@ -41,9 +41,7 @@ public final class TupleSerializerSnapshot<T extends Tuple> private Class<T> tupleClass; @SuppressWarnings("unused") - public TupleSerializerSnapshot() { - super(TupleSerializer.class); - } + public TupleSerializerSnapshot() {} TupleSerializerSnapshot(TupleSerializer<T> serializerInstance) { super(serializerInstance); @@ -56,7 +54,6 @@ public final class TupleSerializerSnapshot<T extends Tuple> * TupleSerializer#resolveSchemaCompatibilityViaRedirectingToNewSnapshotClass}. */ TupleSerializerSnapshot(Class<T> tupleClass) { - super(TupleSerializer.class); this.tupleClass = checkNotNull(tupleClass, "tuple class can not be NULL"); } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeSerializerTest.java index b640e4d38c2..4169b89b9c8 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeSerializerTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeSerializerTest.java @@ -214,7 +214,6 @@ class CompositeSerializerTest { /** Constructor for read instantiation. */ public TestListCompositeSerializerSnapshot() { - super(TestListCompositeSerializer.class); this.isImmutableTargetType = false; } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java index d60c4f9713f..4b3b8fda623 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java @@ -367,9 +367,7 @@ public class CompositeTypeSerializerSnapshotTest { private OuterSchemaCompatibility mockOuterSchemaCompatibility; - public TestCompositeTypeSerializerSnapshot() { - super(TestCompositeTypeSerializer.class); - } + public TestCompositeTypeSerializerSnapshot() {} TestCompositeTypeSerializerSnapshot(TestCompositeTypeSerializer serializer) { super(serializer); diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java index 8e8b7d412fb..efdd6ff04b8 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java @@ -122,9 +122,7 @@ public class ValueWithTs<V> implements Serializable { private static final int VERSION = 2; @SuppressWarnings("unused") - public ValueWithTsSerializerSnapshot() { - super(Serializer.class); - } + public ValueWithTsSerializerSnapshot() {} ValueWithTsSerializerSnapshot(Serializer serializerInstance) { super(serializerInstance); diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index e5214def387..841d295286b 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -984,9 +984,7 @@ public class NFA<T> { private static final int VERSION = 2; - public MigratedNFASerializerSnapshot() { - super(NFASerializer.class); - } + public MigratedNFASerializerSnapshot() {} MigratedNFASerializerSnapshot(NFASerializer<T> legacyNfaSerializer) { super(legacyNfaSerializer); diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java index a62071a847f..fe532be318a 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java @@ -39,9 +39,7 @@ public class NFAStateSerializerSnapshot private boolean supportsPreviousTimestamp = true; /** Constructor for read instantiation. */ - public NFAStateSerializerSnapshot() { - super(NFAStateSerializer.class); - } + public NFAStateSerializerSnapshot() {} /** Constructor to create the snapshot for writing. */ public NFAStateSerializerSnapshot(NFAStateSerializer serializerInstance) { diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java index 2acbcf538d8..9285a51a881 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java @@ -159,9 +159,7 @@ public class SharedBuffer<V> { private static final int VERSION = 2; - public SharedBufferSerializerSnapshot() { - super(SharedBufferSerializer.class); - } + public SharedBufferSerializerSnapshot() {} public SharedBufferSerializerSnapshot(SharedBufferSerializer<K, V> sharedBufferSerializer) { super(sharedBufferSerializer); diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java index 5881a6fa971..d2a8dd4fd7f 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java @@ -31,9 +31,7 @@ public class LockableTypeSerializerSnapshot<E> private static final int CURRENT_VERSION = 1; /** Constructor for read instantiation. */ - public LockableTypeSerializerSnapshot() { - super(Lockable.LockableTypeSerializer.class); - } + public LockableTypeSerializerSnapshot() {} /** Constructor to create the snapshot for writing. */ public LockableTypeSerializerSnapshot( diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java index a4d3c180c56..f605e4f6236 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java @@ -167,9 +167,7 @@ public class NodeId { private static final int VERSION = 1; - public NodeIdSerializerSnapshot() { - super(NodeIdSerializer.class); - } + public NodeIdSerializerSnapshot() {} public NodeIdSerializerSnapshot(NodeIdSerializer nodeIdSerializer) { super(nodeIdSerializer); diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java index 236623d5531..1fa02645fdc 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java @@ -170,9 +170,7 @@ public class SharedBufferEdge { private static final int VERSION = 1; - public SharedBufferEdgeSerializerSnapshot() { - super(SharedBufferEdgeSerializer.class); - } + public SharedBufferEdgeSerializerSnapshot() {} public SharedBufferEdgeSerializerSnapshot( SharedBufferEdgeSerializer sharedBufferEdgeSerializer) { diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java index 9f7d97da4bd..83394964047 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java @@ -171,9 +171,7 @@ public class SharedBufferNode { private static final int VERSION = 1; - public SharedBufferNodeSerializerSnapshot() { - super(SharedBufferNodeSerializer.class); - } + public SharedBufferNodeSerializerSnapshot() {} public SharedBufferNodeSerializerSnapshot( SharedBufferNodeSerializer sharedBufferNodeSerializer) { diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNodeSerializerSnapshotV2.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNodeSerializerSnapshotV2.java index 9d963e829b9..06d6ca578b4 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNodeSerializerSnapshotV2.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNodeSerializerSnapshotV2.java @@ -27,9 +27,7 @@ public final class SharedBufferNodeSerializerSnapshotV2 private static final int VERSION = 1; - public SharedBufferNodeSerializerSnapshotV2() { - super(SharedBufferNodeSerializer.class); - } + public SharedBufferNodeSerializerSnapshotV2() {} public SharedBufferNodeSerializerSnapshotV2( SharedBufferNodeSerializer sharedBufferNodeSerializer) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java index 18c49333463..fb8ec70a362 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java @@ -30,9 +30,7 @@ public class ArrayListSerializerSnapshot<T> private static final int CURRENT_VERSION = 1; /** Constructor for read instantiation. */ - public ArrayListSerializerSnapshot() { - super(ArrayListSerializer.class); - } + public ArrayListSerializerSnapshot() {} /** Constructor for creating the snapshot for writing. */ public ArrayListSerializerSnapshot(ArrayListSerializer<T> arrayListSerializer) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java index 45eb2195cd8..df93d411472 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java @@ -349,9 +349,7 @@ public class TtlStateFactory<K, N, SV, TTLSV, S extends State, IS extends S> { private static final int VERSION = 2; @SuppressWarnings({"WeakerAccess", "unused"}) - public TtlSerializerSnapshot() { - super(TtlSerializer.class); - } + public TtlSerializerSnapshot() {} TtlSerializerSnapshot(TtlSerializer<T> serializerInstance) { super(serializerInstance); diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java index 243d03bd44b..543e539f359 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java @@ -43,9 +43,7 @@ public final class ScalaCaseClassSerializerSnapshot<T extends scala.Product> /** Used via reflection. */ @SuppressWarnings("unused") - public ScalaCaseClassSerializerSnapshot() { - super(ScalaCaseClassSerializer.class); - } + public ScalaCaseClassSerializerSnapshot() {} /** Used for the snapshot path. */ public ScalaCaseClassSerializerSnapshot(ScalaCaseClassSerializer<T> serializerInstance) { diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java index dc88ea46a1b..a6cc00c315a 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java @@ -33,9 +33,7 @@ public class ScalaEitherSerializerSnapshot<L, R> private static final int CURRENT_VERSION = 1; /** Constructor for read instantiation. */ - public ScalaEitherSerializerSnapshot() { - super(EitherSerializer.class); - } + public ScalaEitherSerializerSnapshot() {} /** Constructor to create the snapshot for writing. */ public ScalaEitherSerializerSnapshot(EitherSerializer<L, R> eitherSerializer) { diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java index faa4b3f8937..ab8f94654e1 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java @@ -31,9 +31,7 @@ public final class ScalaOptionSerializerSnapshot<E> private static final int VERSION = 2; @SuppressWarnings("WeakerAccess") - public ScalaOptionSerializerSnapshot() { - super(OptionSerializer.class); - } + public ScalaOptionSerializerSnapshot() {} public ScalaOptionSerializerSnapshot(OptionSerializer<E> serializerInstance) { super(serializerInstance); diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerSnapshot.java index 4a9e44b19e5..8c59e61a673 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerSnapshot.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerSnapshot.java @@ -38,9 +38,7 @@ public class ScalaTrySerializerSnapshot<E> /** This empty nullary constructor is required for deserializing the configuration. */ @SuppressWarnings("unused") - public ScalaTrySerializerSnapshot() { - super(TrySerializer.class); - } + public ScalaTrySerializerSnapshot() {} public ScalaTrySerializerSnapshot(TrySerializer<E> trySerializer) { super(trySerializer); diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshot.java index c7daa6c3ce0..23619b6f39c 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshot.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshot.java @@ -47,9 +47,7 @@ public class TraversableSerializerSnapshot<T extends TraversableOnce<E>, E> private String cbfCode; @SuppressWarnings("unused") - public TraversableSerializerSnapshot() { - super(TraversableSerializer.class); - } + public TraversableSerializerSnapshot() {} public TraversableSerializerSnapshot(TraversableSerializer<T, E> serializerInstance) { super(serializerInstance); @@ -57,7 +55,6 @@ public class TraversableSerializerSnapshot<T extends TraversableOnce<E>, E> } TraversableSerializerSnapshot(String cbfCode) { - super(TraversableSerializer.class); checkArgument(cbfCode != null, "cbfCode cannot be null"); this.cbfCode = cbfCode; diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/Tuple2CaseClassSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/Tuple2CaseClassSerializerSnapshot.java index c35a853f8b8..dc040d0f323 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/Tuple2CaseClassSerializerSnapshot.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/Tuple2CaseClassSerializerSnapshot.java @@ -46,9 +46,7 @@ public final class Tuple2CaseClassSerializerSnapshot<T1, T2> private Class<Tuple2<T1, T2>> type; @SuppressWarnings("unused") - public Tuple2CaseClassSerializerSnapshot() { - super(correspondingSerializerClass()); - } + public Tuple2CaseClassSerializerSnapshot() {} public Tuple2CaseClassSerializerSnapshot( ScalaCaseClassSerializer<Tuple2<T1, T2>> serializerInstance) { @@ -102,9 +100,4 @@ public final class Tuple2CaseClassSerializerSnapshot<T1, T2> ? OuterSchemaCompatibility.COMPATIBLE_AS_IS : OuterSchemaCompatibility.INCOMPATIBLE; } - - private static <T1, T2> - Class<ScalaCaseClassSerializer<scala.Tuple2<T1, T2>>> correspondingSerializerClass() { - return package$.MODULE$.tuple2ClassForJava(); - } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java index b9a253d4e25..f86ad9f3ab8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java @@ -752,9 +752,7 @@ public class CoGroupedStreams<T1, T2> { private static final int VERSION = 2; @SuppressWarnings("WeakerAccess") - public UnionSerializerSnapshot() { - super(UnionSerializer.class); - } + public UnionSerializerSnapshot() {} UnionSerializerSnapshot(UnionSerializer<T1, T2> serializerInstance) { super(serializerInstance); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java index 46849154096..bef0b4b120d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java @@ -890,9 +890,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichS private boolean supportsNullTransaction = true; @SuppressWarnings("WeakerAccess") - public StateSerializerSnapshot() { - super(StateSerializer.class); - } + public StateSerializerSnapshot() {} StateSerializerSnapshot(StateSerializer<TXN, CONTEXT> serializerInstance) { super(serializerInstance); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java index c85e7ca8d82..7a805611502 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java @@ -30,9 +30,7 @@ public class TimerSerializerSnapshot<K, N> private static final int VERSION = 2; - public TimerSerializerSnapshot() { - super(TimerSerializer.class); - } + public TimerSerializerSnapshot() {} public TimerSerializerSnapshot(TimerSerializer<K, N> timerSerializer) { super(timerSerializer); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java index 33376faf858..614b26979e6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java @@ -513,9 +513,7 @@ public class IntervalJoinOperator<K, T1, T2, OUT> private static final int VERSION = 2; @SuppressWarnings({"unused", "WeakerAccess"}) - public BufferEntrySerializerSnapshot() { - super(BufferEntrySerializer.class); - } + public BufferEntrySerializerSnapshot() {} BufferEntrySerializerSnapshot(BufferEntrySerializer<T> serializerInstance) { super(serializerInstance); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java index 4140c52a9ec..f06de1b8c26 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java @@ -278,9 +278,7 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme private static final int VERSION = 2; @SuppressWarnings("WeakerAccess") - public StreamElementSerializerSnapshot() { - super(StreamElementSerializer.class); - } + public StreamElementSerializerSnapshot() {} StreamElementSerializerSnapshot(StreamElementSerializer<T> serializerInstance) { super(serializerInstance); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java index a240181f11f..539e6197f14 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java @@ -39,9 +39,7 @@ public final class ListViewSerializerSnapshot<T> private static final int CURRENT_VERSION = 1; /** Constructor for read instantiation. */ - public ListViewSerializerSnapshot() { - super(ListViewSerializer.class); - } + public ListViewSerializerSnapshot() {} /** Constructor to create the snapshot for writing. */ public ListViewSerializerSnapshot(ListViewSerializer<T> listViewSerializer) { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java index e4b7046e83f..a2b5bc3031d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java @@ -40,9 +40,7 @@ public class MapViewSerializerSnapshot<K, V> private static final int CURRENT_VERSION = 1; /** Constructor for read instantiation. */ - public MapViewSerializerSnapshot() { - super(MapViewSerializer.class); - } + public MapViewSerializerSnapshot() {} /** Constructor to create the snapshot for writing. */ public MapViewSerializerSnapshot(MapViewSerializer<K, V> mapViewSerializer) { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullAwareMapSerializerSnapshot.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullAwareMapSerializerSnapshot.java index 196b94800ad..7780f750007 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullAwareMapSerializerSnapshot.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/dataview/NullAwareMapSerializerSnapshot.java @@ -38,9 +38,7 @@ public class NullAwareMapSerializerSnapshot<K, V> private static final int CURRENT_VERSION = 1; /** Constructor for read instantiation. */ - public NullAwareMapSerializerSnapshot() { - super(NullAwareMapSerializer.class); - } + public NullAwareMapSerializerSnapshot() {} /** Constructor to create the snapshot for writing. */ public NullAwareMapSerializerSnapshot(NullAwareMapSerializer<K, V> mapViewSerializer) { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java index d9e7caabff2..4c712c25414 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ExternalSerializer.java @@ -248,9 +248,7 @@ public final class ExternalSerializer<I, E> extends TypeSerializer<E> { private boolean isInternalInput; - public ExternalSerializerSnapshot() { - super(ExternalSerializer.class); - } + public ExternalSerializerSnapshot() {} public ExternalSerializerSnapshot(ExternalSerializer<I, E> externalSerializer) { super(externalSerializer); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializer.java index 59498a5fa6e..0720e20c8fa 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializer.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializer.java @@ -223,9 +223,7 @@ public final class LinkedListSerializer<T> extends TypeSerializer<LinkedList<T>> private boolean hasNullMask = true; /** Constructor for read instantiation. */ - public LinkedListSerializerSnapshot() { - super(LinkedListSerializer.class); - } + public LinkedListSerializerSnapshot() {} /** Constructor to create the snapshot for writing. */ public LinkedListSerializerSnapshot(LinkedListSerializer<T> listSerializer) { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializer.java index 3ee21dfdb9b..6fda57c7411 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializer.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializer.java @@ -149,9 +149,7 @@ public final class RawValueDataSerializer<T> extends TypeSerializer<RawValueData extends CompositeTypeSerializerSnapshot<RawValueData<T>, RawValueDataSerializer<T>> { @SuppressWarnings("unused") - public RawValueDataSerializerSnapshot() { - super(RawValueDataSerializer.class); - } + public RawValueDataSerializerSnapshot() {} public RawValueDataSerializerSnapshot(RawValueDataSerializer<T> serializerInstance) { super(serializerInstance); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/WindowKeySerializer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/WindowKeySerializer.java index fa19a393842..b4175cc7157 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/WindowKeySerializer.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/WindowKeySerializer.java @@ -177,9 +177,7 @@ public class WindowKeySerializer extends PagedTypeSerializer<WindowKey> { private static final int CURRENT_VERSION = 1; /** This empty nullary constructor is required for deserializing the configuration. */ - public WindowKeySerializerSnapshot() { - super(WindowKeySerializer.class); - } + public WindowKeySerializerSnapshot() {} public WindowKeySerializerSnapshot(WindowKeySerializer serializerInstance) { super(serializerInstance);