tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252128586
 
 

 ##########
 File path: 
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
 ##########
 @@ -92,7 +93,58 @@ protected Object getField(@Nonnull ValueWithTs<?> value, 
int index) {
                protected CompositeSerializer<ValueWithTs<?>> 
createSerializerInstance(
                                PrecomputedParameters precomputed,
                                TypeSerializer<?>... originalSerializers) {
-                       return new Serializer(precomputed, 
(TypeSerializer<Object>) originalSerializers[0]);
+
+                       return new Serializer(precomputed, 
originalSerializers[0], originalSerializers[1]);
+               }
+
+               TypeSerializer<?> getValueSerializer() {
+                       return fieldSerializers[0];
+               }
+
+               @SuppressWarnings("unchecked")
+               TypeSerializer<Long> getTimestampSerializer() {
+                       TypeSerializer<?> fieldSerializer = fieldSerializers[1];
+                       return (TypeSerializer<Long>) fieldSerializer;
+               }
+
+               @Override
+               public TypeSerializerSnapshot<ValueWithTs<?>> 
snapshotConfiguration() {
+                       return new ValueWithTsSerializerSnapshot(this);
+               }
+       }
+
+       /**
+        * A {@link TypeSerializerSnapshot} for ValueWithTs Serializer.
+        */
+       public static final class ValueWithTsSerializerSnapshot extends 
CompositeTypeSerializerSnapshot<ValueWithTs<?>, Serializer> {
+
+               private final static int VERSION = 2;
+
+               @SuppressWarnings("unused")
+               public ValueWithTsSerializerSnapshot() {
+                       super(Serializer.class);
+               }
+
+               ValueWithTsSerializerSnapshot(Serializer serializerInstance) {
+                       super(serializerInstance);
+               }
+
+               @Override
+               protected int getCurrentOuterSnapshotVersion() {
+                       return VERSION;
+               }
+
+               @Override
+               protected TypeSerializer<?>[] getNestedSerializers(Serializer 
outerSerializer) {
+                       return new 
TypeSerializer[]{outerSerializer.getValueSerializer(), 
outerSerializer.getTimestampSerializer()};
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               protected Serializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+                       TypeSerializer<?> valueSerializer = 
nestedSerializers[0];
+                       TypeSerializer<Long> timeSerializer = 
(TypeSerializer<Long>) nestedSerializers[1];
 
 Review comment:
   nit: `time` --> `timestamp` for naming consistency

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to