[GitHub] [flink] tzulitai commented on a change in pull request #12263: [FLINK-16998][core] Support backwards compatibility for upgraded RowSerializer

2020-05-20 Thread GitBox


tzulitai commented on a change in pull request #12263:
URL: https://github.com/apache/flink/pull/12263#discussion_r427970574



##
File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
##
@@ -367,21 +367,21 @@ public int getVersion() {
/**
 * A {@link TypeSerializerSnapshot} for RowSerializer.
 */
-   // TODO not fully functional yet due to FLINK-17520
public static final class RowSerializerSnapshot extends 
CompositeTypeSerializerSnapshot {
 
private static final int VERSION = 3;
 
-   private static final int VERSION_WITHOUT_ROW_KIND = 2;
+   private static final int LAST_VERSION_WITHOUT_ROW_KIND = 2;
 
-   private boolean legacyModeEnabled = false;
+   private int readVersion = VERSION;
 
public RowSerializerSnapshot() {
super(RowSerializer.class);
}
 
RowSerializerSnapshot(RowSerializer serializerInstance) {
super(serializerInstance);
+   this.readVersion = serializerInstance.legacyModeEnabled 
? LAST_VERSION_WITHOUT_ROW_KIND : VERSION;

Review comment:
   I don't think this line is needed, unless I'm missing something in the 
tests.
   The read version should only ever be changed if this snapshot was created by 
restoring from a snapshot.
   In this case, this constructor is only ever used to create a new snapshot 
when checkpointing occurs - the read version should be the default value 
(`VALUE`).

##
File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
##
@@ -60,7 +60,7 @@
 
public static final int ROW_KIND_OFFSET = 2;
 
-   private static final long serialVersionUID = 2L;
+   private static final long serialVersionUID = 1L; // legacy, don't touch

Review comment:
   nit: maybe add a comment that this can only be touched after support for 
1.9 savepoints is ditched.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] tzulitai commented on a change in pull request #12263: [FLINK-16998][core] Support backwards compatibility for upgraded RowSerializer

2020-05-20 Thread GitBox


tzulitai commented on a change in pull request #12263:
URL: https://github.com/apache/flink/pull/12263#discussion_r427782378



##
File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
##
@@ -367,36 +367,42 @@ public int getVersion() {
/**
 * A {@link TypeSerializerSnapshot} for RowSerializer.
 */
-   // TODO not fully functional yet due to FLINK-17520
public static final class RowSerializerSnapshot extends 
CompositeTypeSerializerSnapshot {
 
private static final int VERSION = 3;
 
-   private static final int VERSION_WITHOUT_ROW_KIND = 2;
+   private static final int LAST_VERSION_WITHOUT_ROW_KIND = 2;
 
-   private boolean legacyModeEnabled = false;
+   private int version = VERSION;
 
public RowSerializerSnapshot() {
super(RowSerializer.class);
}
 
RowSerializerSnapshot(RowSerializer serializerInstance) {
super(serializerInstance);
+   this.version = translateVersion(serializerInstance);
}
 
@Override
protected int getCurrentOuterSnapshotVersion() {
-   return VERSION;
+   return version;

Review comment:
   This method is only ever relevant for when writing snapshots and not 
used on restore.
   Therefore, this should always be the latest version, and not the read older 
version.
   ```suggestion
return VERSION;
   ```

##
File path: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
##
@@ -76,14 +84,16 @@ public RowSerializerUpgradeTest(TestSpecification 
testSpecification) {
public static final class RowSerializerSetup implements 
TypeSerializerUpgradeTestBase.PreUpgradeSetup {
@Override
public TypeSerializer createPriorSerializer() {
-   return stringLongRowSupplier();
+   return createRowSerializer(true);

Review comment:
   To really clarify this, I think we should make the `RowSerializer` 
constructor that allows passing in the `legacyModeEnabled` flag private, to be 
only usable by the `RowSerializerSnapshot#createOuterSerializer`. This concern 
should not be leaked into tests.
   
   The bottom line is, the concern of creating an old serializer with previous 
formats should only be visible to the snapshots.

##
File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
##
@@ -367,36 +367,42 @@ public int getVersion() {
/**
 * A {@link TypeSerializerSnapshot} for RowSerializer.
 */
-   // TODO not fully functional yet due to FLINK-17520
public static final class RowSerializerSnapshot extends 
CompositeTypeSerializerSnapshot {
 
private static final int VERSION = 3;
 
-   private static final int VERSION_WITHOUT_ROW_KIND = 2;
+   private static final int LAST_VERSION_WITHOUT_ROW_KIND = 2;
 
-   private boolean legacyModeEnabled = false;
+   private int version = VERSION;

Review comment:
   Maybe rename this to `readVersion`, to better convey its difference with 
the static `VERSION`.

##
File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
##
@@ -367,36 +367,42 @@ public int getVersion() {
/**
 * A {@link TypeSerializerSnapshot} for RowSerializer.
 */
-   // TODO not fully functional yet due to FLINK-17520
public static final class RowSerializerSnapshot extends 
CompositeTypeSerializerSnapshot {
 
private static final int VERSION = 3;
 
-   private static final int VERSION_WITHOUT_ROW_KIND = 2;
+   private static final int LAST_VERSION_WITHOUT_ROW_KIND = 2;
 
-   private boolean legacyModeEnabled = false;
+   private int version = VERSION;
 
public RowSerializerSnapshot() {
super(RowSerializer.class);
}
 
RowSerializerSnapshot(RowSerializer serializerInstance) {
super(serializerInstance);
+   this.version = translateVersion(serializerInstance);
}
 
@Override
protected int getCurrentOuterSnapshotVersion() {
-   return VERSION;
+   return version;
}
 
@Override
protected void readOuterSnapshot(
int readOuterSnapshotVersion,
DataInputView in,
ClassLoader userCodeClassLoader) {
-   if