tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252128586
########## File path: flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java ########## @@ -92,7 +93,58 @@ protected Object getField(@Nonnull ValueWithTs<?> value, int index) { protected CompositeSerializer<ValueWithTs<?>> createSerializerInstance( PrecomputedParameters precomputed, TypeSerializer<?>... originalSerializers) { - return new Serializer(precomputed, (TypeSerializer<Object>) originalSerializers[0]); + + return new Serializer(precomputed, originalSerializers[0], originalSerializers[1]); + } + + TypeSerializer<?> getValueSerializer() { + return fieldSerializers[0]; + } + + @SuppressWarnings("unchecked") + TypeSerializer<Long> getTimestampSerializer() { + TypeSerializer<?> fieldSerializer = fieldSerializers[1]; + return (TypeSerializer<Long>) fieldSerializer; + } + + @Override + public TypeSerializerSnapshot<ValueWithTs<?>> snapshotConfiguration() { + return new ValueWithTsSerializerSnapshot(this); + } + } + + /** + * A {@link TypeSerializerSnapshot} for ValueWithTs Serializer. + */ + public static final class ValueWithTsSerializerSnapshot extends CompositeTypeSerializerSnapshot<ValueWithTs<?>, Serializer> { + + private final static int VERSION = 2; + + @SuppressWarnings("unused") + public ValueWithTsSerializerSnapshot() { + super(Serializer.class); + } + + ValueWithTsSerializerSnapshot(Serializer serializerInstance) { + super(serializerInstance); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return VERSION; + } + + @Override + protected TypeSerializer<?>[] getNestedSerializers(Serializer outerSerializer) { + return new TypeSerializer[]{outerSerializer.getValueSerializer(), outerSerializer.getTimestampSerializer()}; + } + + @SuppressWarnings("unchecked") + @Override + protected Serializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { + TypeSerializer<?> valueSerializer = nestedSerializers[0]; + TypeSerializer<Long> timeSerializer = (TypeSerializer<Long>) nestedSerializers[1]; Review comment: nit: `time` --> `timestamp` for naming consistency ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services