[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-30 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252184509
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
 ##
 @@ -56,8 +57,8 @@ public String toString() {
/** Serializer for Serializer. */
public static class Serializer extends 
CompositeSerializer> {
 
-   public Serializer(TypeSerializer userValueSerializer) {
-   super(true, userValueSerializer, 
LongSerializer.INSTANCE);
+   public Serializer(TypeSerializer valueSerializer, 
TypeSerializer timestampSerializer) {
 
 Review comment:
   ok, that's a fair argument that I can agree with. Since I don't have a 
strong opinion on this, I'll leave the constructors as is in this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252145959
 
 

 ##
 File path: 
flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
 ##
 @@ -101,45 +101,8 @@ class OptionSerializer[A](val elemSerializer: 
TypeSerializer[A])
   // Serializer configuration snapshotting & compatibility
   // 

 
-  override def snapshotConfiguration(): ScalaOptionSerializerConfigSnapshot[A] 
= {
-new ScalaOptionSerializerConfigSnapshot[A](elemSerializer)
-  }
-
-  override def ensureCompatibility(
-  configSnapshot: TypeSerializerConfigSnapshot[_]): 
CompatibilityResult[Option[A]] = {
-
-configSnapshot match {
-  case optionSerializerConfigSnapshot
-  : ScalaOptionSerializerConfigSnapshot[A] =>
-ensureCompatibilityInternal(optionSerializerConfigSnapshot)
-  case legacyOptionSerializerConfigSnapshot
-  : OptionSerializer.OptionSerializerConfigSnapshot[A] =>
 
 Review comment:
   Removing this path will lead to problems when restoring from Flink 1.3, 
because this snapshot class was used back in Flink 1.3.
   
   OTOH, it should be possible to redirect `OptionSerializerConfigSnapshot`'s 
compatibility check to the new `ScalaOptionSerializerSnapshot`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252144982
 
 

 ##
 File path: 
flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
 ##
 @@ -31,6 +33,7 @@
  * allow calling different base class constructors from subclasses, while we 
need that
  * for the default empty constructor.
 
 Review comment:
   nit: Add `@deprecated` message.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252140553
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
 ##
 @@ -618,63 +619,87 @@ public boolean canEqual(Object obj) {
}
 
@Override
-   public TypeSerializerConfigSnapshot> 
snapshotConfiguration() {
-   return new 
UnionSerializerConfigSnapshot<>(oneSerializer, twoSerializer);
-   }
-
-   @Override
-   public CompatibilityResult> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-   if (configSnapshot instanceof 
UnionSerializerConfigSnapshot) {
-   List, 
TypeSerializerSnapshot>> previousSerializersAndConfigs =
-   ((UnionSerializerConfigSnapshot) 
configSnapshot).getNestedSerializersAndConfigs();
-
-   CompatibilityResult 
oneSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-   previousSerializersAndConfigs.get(0).f0,
-   UnloadableDummyTypeSerializer.class,
-   previousSerializersAndConfigs.get(0).f1,
-   oneSerializer);
-
-   CompatibilityResult 
twoSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-   previousSerializersAndConfigs.get(1).f0,
-   UnloadableDummyTypeSerializer.class,
-   previousSerializersAndConfigs.get(1).f1,
-   twoSerializer);
-
-   if 
(!oneSerializerCompatResult.isRequiresMigration() && 
!twoSerializerCompatResult.isRequiresMigration()) {
-   return CompatibilityResult.compatible();
-   } else if 
(oneSerializerCompatResult.getConvertDeserializer() != null && 
twoSerializerCompatResult.getConvertDeserializer() != null) {
-   return 
CompatibilityResult.requiresMigration(
-   new UnionSerializer<>(
-   new 
TypeDeserializerAdapter<>(oneSerializerCompatResult.getConvertDeserializer()),
-   new 
TypeDeserializerAdapter<>(twoSerializerCompatResult.getConvertDeserializer(;
-   }
-   }
-
-   return CompatibilityResult.requiresMigration();
+   public TypeSerializerSnapshot> 
snapshotConfiguration() {
+   return new UnionSerializerSnapshot<>(this);
}
}
 
/**
 * The {@link TypeSerializerConfigSnapshot} for the {@link 
UnionSerializer}.
 
 Review comment:
   nit: Add `@deprecated` message


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252139206
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
 ##
 @@ -260,73 +262,80 @@ private void readObject(ObjectInputStream in) throws 
IOException, ClassNotFoundE
}
 
// 

-   // Serializer configuration snapshotting & compatibility
+   // Serializer configuration snapshoting & compatibility
// 

 
@Override
-   public RowSerializerConfigSnapshot snapshotConfiguration() {
-   return new RowSerializerConfigSnapshot(fieldSerializers);
+   public TypeSerializerSnapshot snapshotConfiguration() {
+   return new RowSerializerSnapshot(this);
}
 
-   @Override
-   public CompatibilityResult 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-   if (configSnapshot instanceof RowSerializerConfigSnapshot) {
-   List, 
TypeSerializerSnapshot>> previousFieldSerializersAndConfigs =
-   ((RowSerializerConfigSnapshot) 
configSnapshot).getNestedSerializersAndConfigs();
-
-   if (previousFieldSerializersAndConfigs.size() == 
fieldSerializers.length) {
-   boolean requireMigration = false;
-   TypeSerializer[] convertDeserializers = new 
TypeSerializer[fieldSerializers.length];
-
-   CompatibilityResult compatResult;
-   int i = 0;
-   for (Tuple2, 
TypeSerializerSnapshot> f : previousFieldSerializersAndConfigs) {
-   compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
-   f.f0,
-   
UnloadableDummyTypeSerializer.class,
-   f.f1,
-   fieldSerializers[i]);
-
-   if (compatResult.isRequiresMigration()) 
{
-   requireMigration = true;
-
-   if 
(compatResult.getConvertDeserializer() == null) {
-   // one of the field 
serializers cannot provide a fallback deserializer
-   return 
CompatibilityResult.requiresMigration();
-   } else {
-   convertDeserializers[i] 
=
-   new 
TypeDeserializerAdapter<>(compatResult.getConvertDeserializer());
-   }
-   }
+   /**
+* A snapshot for {@link RowSerializer}.
+*/
 
 Review comment:
   nit: Add `@deprecated` message.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252134315
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
 ##
 @@ -233,45 +247,25 @@ public int hashCode() {
}
 
@Override
-   public NullableSerializerConfigSnapshot snapshotConfiguration() {
-   return new 
NullableSerializerConfigSnapshot<>(originalSerializer);
+   public TypeSerializerSnapshot snapshotConfiguration() {
+   return new NullableSerializerSnapshot<>(this);
}
 
-   @Override
-   public CompatibilityResult 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-   if (configSnapshot instanceof NullableSerializerConfigSnapshot) 
{
-   List, 
TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs =
-   ((NullableSerializerConfigSnapshot) 
configSnapshot).getNestedSerializersAndConfigs();
-
-   CompatibilityResult compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
-   previousKvSerializersAndConfigs.get(0).f0,
-   UnloadableDummyTypeSerializer.class,
-   previousKvSerializersAndConfigs.get(0).f1,
-   originalSerializer);
-
-   if (!compatResult.isRequiresMigration()) {
-   return CompatibilityResult.compatible();
-   } else if (compatResult.getConvertDeserializer() != 
null) {
-   return CompatibilityResult.requiresMigration(
-   new NullableSerializer<>(
-   new 
TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()), 
padNullValue()));
-   }
-   }
-
-   return CompatibilityResult.requiresMigration();
-   }
 
/**
 * Configuration snapshot for serializers of nullable types, containing 
the
 * configuration snapshot of its original serializer.
 
 Review comment:
   nit: Add `@deprecated` message.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252132629
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
 ##
 @@ -56,8 +57,8 @@ public String toString() {
/** Serializer for Serializer. */
public static class Serializer extends 
CompositeSerializer> {
 
 Review comment:
   Ah scratch that, just realized that this is only a serializer used in tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252130677
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
 ##
 @@ -56,8 +57,8 @@ public String toString() {
/** Serializer for Serializer. */
public static class Serializer extends 
CompositeSerializer> {
 
 Review comment:
   This serializer class previously did not have a `serialVersionUID` defined.
   Need to explicitly set it to what it was before, because I guess the serial 
version UID would have changed when adding the new constructors.
   
   OTOH, there seems to be missing a migration test for this serializer, 
because that would have caught this problem.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252130677
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
 ##
 @@ -56,8 +57,8 @@ public String toString() {
/** Serializer for Serializer. */
public static class Serializer extends 
CompositeSerializer> {
 
 Review comment:
   This serializer class previously did not have a `serialVersionUID` defined.
   Need to explicitly set it to what it was before, to be on the safe side here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252128586
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
 ##
 @@ -92,7 +93,58 @@ protected Object getField(@Nonnull ValueWithTs value, 
int index) {
protected CompositeSerializer> 
createSerializerInstance(
PrecomputedParameters precomputed,
TypeSerializer... originalSerializers) {
-   return new Serializer(precomputed, 
(TypeSerializer) originalSerializers[0]);
+
+   return new Serializer(precomputed, 
originalSerializers[0], originalSerializers[1]);
+   }
+
+   TypeSerializer getValueSerializer() {
+   return fieldSerializers[0];
+   }
+
+   @SuppressWarnings("unchecked")
+   TypeSerializer getTimestampSerializer() {
+   TypeSerializer fieldSerializer = fieldSerializers[1];
+   return (TypeSerializer) fieldSerializer;
+   }
+
+   @Override
+   public TypeSerializerSnapshot> 
snapshotConfiguration() {
+   return new ValueWithTsSerializerSnapshot(this);
+   }
+   }
+
+   /**
+* A {@link TypeSerializerSnapshot} for ValueWithTs Serializer.
+*/
+   public static final class ValueWithTsSerializerSnapshot extends 
CompositeTypeSerializerSnapshot, Serializer> {
+
+   private final static int VERSION = 2;
+
+   @SuppressWarnings("unused")
+   public ValueWithTsSerializerSnapshot() {
+   super(Serializer.class);
+   }
+
+   ValueWithTsSerializerSnapshot(Serializer serializerInstance) {
+   super(serializerInstance);
+   }
+
+   @Override
+   protected int getCurrentOuterSnapshotVersion() {
+   return VERSION;
+   }
+
+   @Override
+   protected TypeSerializer[] getNestedSerializers(Serializer 
outerSerializer) {
+   return new 
TypeSerializer[]{outerSerializer.getValueSerializer(), 
outerSerializer.getTimestampSerializer()};
+   }
+
+   @SuppressWarnings("unchecked")
+   @Override
+   protected Serializer 
createOuterSerializerWithNestedSerializers(TypeSerializer[] 
nestedSerializers) {
+   TypeSerializer valueSerializer = 
nestedSerializers[0];
+   TypeSerializer timeSerializer = 
(TypeSerializer) nestedSerializers[1];
 
 Review comment:
   nit: `time` --> `timestamp` for naming consistency


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252128501
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
 ##
 @@ -56,8 +57,8 @@ public String toString() {
/** Serializer for Serializer. */
public static class Serializer extends 
CompositeSerializer> {
 
-   public Serializer(TypeSerializer userValueSerializer) {
-   super(true, userValueSerializer, 
LongSerializer.INSTANCE);
+   public Serializer(TypeSerializer valueSerializer, 
TypeSerializer timestampSerializer) {
 
 Review comment:
   I don't think we need a public constructor that accepts the timestamp 
serializer.
   This should be a private constructor used only by the snapshot class.
   
   We should still have a public constructor that accepts the user value 
serializer, and by default just uses `LongSerializer.INSTANCE` as the new 
timestamp serializer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252128802
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ##
 @@ -126,15 +128,15 @@ private IS createState() throws Exception {
@SuppressWarnings("unchecked")
private IS createValueState() throws Exception {
ValueStateDescriptor> ttlDescriptor = new 
ValueStateDescriptor<>(
-   stateDesc.getName(), new 
TtlSerializer<>(stateDesc.getSerializer()));
+   stateDesc.getName(), new 
TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer()));
 
 Review comment:
   As mentioned above, having to pass in a `LongSerializer.INSTANCE` every time 
we're instantiating a TtlSerializer seems very redundant.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #7590: [FLINK-11329][core] Migrating CompositeSerializers

2019-01-29 Thread GitBox
tzulitai commented on a change in pull request #7590: [FLINK-11329][core] 
Migrating CompositeSerializers
URL: https://github.com/apache/flink/pull/7590#discussion_r252127263
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ##
 @@ -302,6 +299,7 @@ static PrecomputedParameters precompute(
}
 
/** Snapshot field serializers of composite type. */
 
 Review comment:
   nit: Add `@deprecated` message and direct to new snapshot class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services