tzulitai commented on a change in pull request #12263:
URL: https://github.com/apache/flink/pull/12263#discussion_r427782378
##
File path:
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
##
@@ -367,36 +367,42 @@ public int getVersion() {
/**
* A {@link TypeSerializerSnapshot} for RowSerializer.
*/
- // TODO not fully functional yet due to FLINK-17520
public static final class RowSerializerSnapshot extends
CompositeTypeSerializerSnapshot {
private static final int VERSION = 3;
- private static final int VERSION_WITHOUT_ROW_KIND = 2;
+ private static final int LAST_VERSION_WITHOUT_ROW_KIND = 2;
- private boolean legacyModeEnabled = false;
+ private int version = VERSION;
public RowSerializerSnapshot() {
super(RowSerializer.class);
}
RowSerializerSnapshot(RowSerializer serializerInstance) {
super(serializerInstance);
+ this.version = translateVersion(serializerInstance);
}
@Override
protected int getCurrentOuterSnapshotVersion() {
- return VERSION;
+ return version;
Review comment:
This method is only ever relevant for when writing snapshots and not
used on restore.
Therefore, this should always be the latest version, and not the read older
version.
```suggestion
return VERSION;
```
##
File path:
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java
##
@@ -76,14 +84,16 @@ public RowSerializerUpgradeTest(TestSpecification
testSpecification) {
public static final class RowSerializerSetup implements
TypeSerializerUpgradeTestBase.PreUpgradeSetup {
@Override
public TypeSerializer createPriorSerializer() {
- return stringLongRowSupplier();
+ return createRowSerializer(true);
Review comment:
To really clarify this, I think we should make the `RowSerializer`
constructor that allows passing in the `legacyModeEnabled` flag private, to be
only usable by the `RowSerializerSnapshot#createOuterSerializer`. This concern
should not be leaked into tests.
The bottom line is, the concern of creating an old serializer with previous
formats should only be visible to the snapshots.
##
File path:
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
##
@@ -367,36 +367,42 @@ public int getVersion() {
/**
* A {@link TypeSerializerSnapshot} for RowSerializer.
*/
- // TODO not fully functional yet due to FLINK-17520
public static final class RowSerializerSnapshot extends
CompositeTypeSerializerSnapshot {
private static final int VERSION = 3;
- private static final int VERSION_WITHOUT_ROW_KIND = 2;
+ private static final int LAST_VERSION_WITHOUT_ROW_KIND = 2;
- private boolean legacyModeEnabled = false;
+ private int version = VERSION;
Review comment:
Maybe rename this to `readVersion`, to better convey its difference with
the static `VERSION`.
##
File path:
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
##
@@ -367,36 +367,42 @@ public int getVersion() {
/**
* A {@link TypeSerializerSnapshot} for RowSerializer.
*/
- // TODO not fully functional yet due to FLINK-17520
public static final class RowSerializerSnapshot extends
CompositeTypeSerializerSnapshot {
private static final int VERSION = 3;
- private static final int VERSION_WITHOUT_ROW_KIND = 2;
+ private static final int LAST_VERSION_WITHOUT_ROW_KIND = 2;
- private boolean legacyModeEnabled = false;
+ private int version = VERSION;
public RowSerializerSnapshot() {
super(RowSerializer.class);
}
RowSerializerSnapshot(RowSerializer serializerInstance) {
super(serializerInstance);
+ this.version = translateVersion(serializerInstance);
}
@Override
protected int getCurrentOuterSnapshotVersion() {
- return VERSION;
+ return version;
}
@Override
protected void readOuterSnapshot(
int readOuterSnapshotVersion,
DataInputView in,
ClassLoader userCodeClassLoader) {
- if