[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252184509 ## File path: flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java ## @@ -56,8 +57,8 @@ public String toString() { /** Serializer for Serializer. */ public static class Serializer extends CompositeSerializer> { - public Serializer(TypeSerializer userValueSerializer) { - super(true, userValueSerializer, LongSerializer.INSTANCE); + public Serializer(TypeSerializer valueSerializer, TypeSerializer timestampSerializer) { Review comment: ok, that's a fair argument that I can agree with. Since I don't have a strong opinion on this, I'll leave the constructors as is in this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252145959 ## File path: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala ## @@ -101,45 +101,8 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A]) // Serializer configuration snapshotting & compatibility // - override def snapshotConfiguration(): ScalaOptionSerializerConfigSnapshot[A] = { -new ScalaOptionSerializerConfigSnapshot[A](elemSerializer) - } - - override def ensureCompatibility( - configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[Option[A]] = { - -configSnapshot match { - case optionSerializerConfigSnapshot - : ScalaOptionSerializerConfigSnapshot[A] => -ensureCompatibilityInternal(optionSerializerConfigSnapshot) - case legacyOptionSerializerConfigSnapshot - : OptionSerializer.OptionSerializerConfigSnapshot[A] => Review comment: Removing this path will lead to problems when restoring from Flink 1.3, because this snapshot class was used back in Flink 1.3. OTOH, it should be possible to redirect `OptionSerializerConfigSnapshot`'s compatibility check to the new `ScalaOptionSerializerSnapshot`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252144982 ## File path: flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java ## @@ -31,6 +33,7 @@ * allow calling different base class constructors from subclasses, while we need that * for the default empty constructor. Review comment: nit: Add `@deprecated` message. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252140553 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java ## @@ -618,63 +619,87 @@ public boolean canEqual(Object obj) { } @Override - public TypeSerializerConfigSnapshot> snapshotConfiguration() { - return new UnionSerializerConfigSnapshot<>(oneSerializer, twoSerializer); - } - - @Override - public CompatibilityResult> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof UnionSerializerConfigSnapshot) { - List, TypeSerializerSnapshot>> previousSerializersAndConfigs = - ((UnionSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); - - CompatibilityResult oneSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult( - previousSerializersAndConfigs.get(0).f0, - UnloadableDummyTypeSerializer.class, - previousSerializersAndConfigs.get(0).f1, - oneSerializer); - - CompatibilityResult twoSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult( - previousSerializersAndConfigs.get(1).f0, - UnloadableDummyTypeSerializer.class, - previousSerializersAndConfigs.get(1).f1, - twoSerializer); - - if (!oneSerializerCompatResult.isRequiresMigration() && !twoSerializerCompatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (oneSerializerCompatResult.getConvertDeserializer() != null && twoSerializerCompatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new UnionSerializer<>( - new TypeDeserializerAdapter<>(oneSerializerCompatResult.getConvertDeserializer()), - new TypeDeserializerAdapter<>(twoSerializerCompatResult.getConvertDeserializer(; - } - } - - return CompatibilityResult.requiresMigration(); + public TypeSerializerSnapshot> snapshotConfiguration() { + return new UnionSerializerSnapshot<>(this); } } /** * The {@link TypeSerializerConfigSnapshot} for the {@link UnionSerializer}. Review comment: nit: Add `@deprecated` message This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252139206 ## File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java ## @@ -260,73 +262,80 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE } // - // Serializer configuration snapshotting & compatibility + // Serializer configuration snapshoting & compatibility // @Override - public RowSerializerConfigSnapshot snapshotConfiguration() { - return new RowSerializerConfigSnapshot(fieldSerializers); + public TypeSerializerSnapshot snapshotConfiguration() { + return new RowSerializerSnapshot(this); } - @Override - public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof RowSerializerConfigSnapshot) { - List, TypeSerializerSnapshot>> previousFieldSerializersAndConfigs = - ((RowSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); - - if (previousFieldSerializersAndConfigs.size() == fieldSerializers.length) { - boolean requireMigration = false; - TypeSerializer[] convertDeserializers = new TypeSerializer[fieldSerializers.length]; - - CompatibilityResult compatResult; - int i = 0; - for (Tuple2, TypeSerializerSnapshot> f : previousFieldSerializersAndConfigs) { - compatResult = CompatibilityUtil.resolveCompatibilityResult( - f.f0, - UnloadableDummyTypeSerializer.class, - f.f1, - fieldSerializers[i]); - - if (compatResult.isRequiresMigration()) { - requireMigration = true; - - if (compatResult.getConvertDeserializer() == null) { - // one of the field serializers cannot provide a fallback deserializer - return CompatibilityResult.requiresMigration(); - } else { - convertDeserializers[i] = - new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()); - } - } + /** +* A snapshot for {@link RowSerializer}. +*/ Review comment: nit: Add `@deprecated` message. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252134315 ## File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java ## @@ -233,45 +247,25 @@ public int hashCode() { } @Override - public NullableSerializerConfigSnapshot snapshotConfiguration() { - return new NullableSerializerConfigSnapshot<>(originalSerializer); + public TypeSerializerSnapshot snapshotConfiguration() { + return new NullableSerializerSnapshot<>(this); } - @Override - public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof NullableSerializerConfigSnapshot) { - List, TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs = - ((NullableSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); - - CompatibilityResult compatResult = CompatibilityUtil.resolveCompatibilityResult( - previousKvSerializersAndConfigs.get(0).f0, - UnloadableDummyTypeSerializer.class, - previousKvSerializersAndConfigs.get(0).f1, - originalSerializer); - - if (!compatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (compatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new NullableSerializer<>( - new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()), padNullValue())); - } - } - - return CompatibilityResult.requiresMigration(); - } /** * Configuration snapshot for serializers of nullable types, containing the * configuration snapshot of its original serializer. Review comment: nit: Add `@deprecated` message. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252132629 ## File path: flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java ## @@ -56,8 +57,8 @@ public String toString() { /** Serializer for Serializer. */ public static class Serializer extends CompositeSerializer> { Review comment: Ah scratch that, just realized that this is only a serializer used in tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252130677 ## File path: flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java ## @@ -56,8 +57,8 @@ public String toString() { /** Serializer for Serializer. */ public static class Serializer extends CompositeSerializer> { Review comment: This serializer class previously did not have a `serialVersionUID` defined. Need to explicitly set it to what it was before, because I guess the serial version UID would have changed when adding the new constructors. OTOH, there seems to be missing a migration test for this serializer, because that would have caught this problem. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252130677 ## File path: flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java ## @@ -56,8 +57,8 @@ public String toString() { /** Serializer for Serializer. */ public static class Serializer extends CompositeSerializer> { Review comment: This serializer class previously did not have a `serialVersionUID` defined. Need to explicitly set it to what it was before, to be on the safe side here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252128586 ## File path: flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java ## @@ -92,7 +93,58 @@ protected Object getField(@Nonnull ValueWithTs value, int index) { protected CompositeSerializer> createSerializerInstance( PrecomputedParameters precomputed, TypeSerializer... originalSerializers) { - return new Serializer(precomputed, (TypeSerializer) originalSerializers[0]); + + return new Serializer(precomputed, originalSerializers[0], originalSerializers[1]); + } + + TypeSerializer getValueSerializer() { + return fieldSerializers[0]; + } + + @SuppressWarnings("unchecked") + TypeSerializer getTimestampSerializer() { + TypeSerializer fieldSerializer = fieldSerializers[1]; + return (TypeSerializer) fieldSerializer; + } + + @Override + public TypeSerializerSnapshot> snapshotConfiguration() { + return new ValueWithTsSerializerSnapshot(this); + } + } + + /** +* A {@link TypeSerializerSnapshot} for ValueWithTs Serializer. +*/ + public static final class ValueWithTsSerializerSnapshot extends CompositeTypeSerializerSnapshot, Serializer> { + + private final static int VERSION = 2; + + @SuppressWarnings("unused") + public ValueWithTsSerializerSnapshot() { + super(Serializer.class); + } + + ValueWithTsSerializerSnapshot(Serializer serializerInstance) { + super(serializerInstance); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return VERSION; + } + + @Override + protected TypeSerializer[] getNestedSerializers(Serializer outerSerializer) { + return new TypeSerializer[]{outerSerializer.getValueSerializer(), outerSerializer.getTimestampSerializer()}; + } + + @SuppressWarnings("unchecked") + @Override + protected Serializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + TypeSerializer valueSerializer = nestedSerializers[0]; + TypeSerializer timeSerializer = (TypeSerializer) nestedSerializers[1]; Review comment: nit: `time` --> `timestamp` for naming consistency This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252128501 ## File path: flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java ## @@ -56,8 +57,8 @@ public String toString() { /** Serializer for Serializer. */ public static class Serializer extends CompositeSerializer> { - public Serializer(TypeSerializer userValueSerializer) { - super(true, userValueSerializer, LongSerializer.INSTANCE); + public Serializer(TypeSerializer valueSerializer, TypeSerializer timestampSerializer) { Review comment: I don't think we need a public constructor that accepts the timestamp serializer. This should be a private constructor used only by the snapshot class. We should still have a public constructor that accepts the user value serializer, and by default just uses `LongSerializer.INSTANCE` as the new timestamp serializer. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252128802 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ## @@ -126,15 +128,15 @@ private IS createState() throws Exception { @SuppressWarnings("unchecked") private IS createValueState() throws Exception { ValueStateDescriptor> ttlDescriptor = new ValueStateDescriptor<>( - stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer())); + stateDesc.getName(), new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer())); Review comment: As mentioned above, having to pass in a `LongSerializer.INSTANCE` every time we're instantiating a TtlSerializer seems very redundant. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers URL: https://github.com/apache/flink/pull/7590#discussion_r252127263 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ## @@ -302,6 +299,7 @@ static PrecomputedParameters precompute( } /** Snapshot field serializers of composite type. */ Review comment: nit: Add `@deprecated` message and direct to new snapshot class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services