http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 7152bfc..96025fe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -43,6 +43,8 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.runtime.PojoSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; @@ -71,7 +73,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; @@ -642,6 +643,139 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.value(); } + @Test + public void testKryoRestoreResilienceWithDifferentRegistrationOrder() throws Exception { + CheckpointStreamFactory streamFactory = createStreamFactory(); + Environment env = new DummyEnvironment("test", 1, 0); + + // register A first then B + env.getExecutionConfig().registerKryoType(TestNestedPojoClassA.class); + env.getExecutionConfig().registerKryoType(TestNestedPojoClassB.class); + + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); + + TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class); + + // make sure that we are in fact using the KryoSerializer + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer); + + ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType); + ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + // ============== create snapshot of current configuration ============== + + // make some more modifications + backend.setCurrentKey(1); + state.update(new TestPojo("u1", 1, new TestNestedPojoClassA(1.0, 2), new TestNestedPojoClassB(2.3, "foo"))); + + backend.setCurrentKey(2); + state.update(new TestPojo("u2", 2, new TestNestedPojoClassA(2.0, 5), new TestNestedPojoClassB(3.1, "bar"))); + + KeyedStateHandle snapshot = runSnapshot(backend.snapshot( + 682375462378L, + 2, + streamFactory, + CheckpointOptions.forFullCheckpoint())); + + backend.dispose(); + + // ========== restore snapshot, with a different registration order in the configuration ========== + + env = new DummyEnvironment("test", 1, 0); + + env.getExecutionConfig().registerKryoType(TestNestedPojoClassB.class); // this time register B first + env.getExecutionConfig().registerKryoType(TestNestedPojoClassA.class); + + backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env); + + snapshot.discardState(); + + // re-initialize to ensure that we create the KryoSerializer from scratch, otherwise + // initializeSerializerUnlessSet would not pick up our new config + kvId = new ValueStateDescriptor<>("id", pojoType); + state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + backend.setCurrentKey(1); + + // update to test state backends that eagerly serialize, such as RocksDB + state.update(new TestPojo("u1", 11, new TestNestedPojoClassA(22.1, 12), new TestNestedPojoClassB(1.23, "foobar"))); + + // this tests backends that lazily serialize, such as memory state backend + runSnapshot(backend.snapshot( + 682375462378L, + 2, + streamFactory, + CheckpointOptions.forFullCheckpoint())); + + backend.dispose(); + } + + @Test + public void testPojoRestoreResilienceWithDifferentRegistrationOrder() throws Exception { + CheckpointStreamFactory streamFactory = createStreamFactory(); + Environment env = new DummyEnvironment("test", 1, 0); + + // register A first then B + env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class); + env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class); + + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); + + TypeInformation<TestPojo> pojoType = TypeExtractor.getForClass(TestPojo.class); + + // make sure that we are in fact using the PojoSerializer + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof PojoSerializer); + + ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType); + ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + // ============== create snapshot of current configuration ============== + + // make some more modifications + backend.setCurrentKey(1); + state.update(new TestPojo("u1", 1, new TestNestedPojoClassA(1.0, 2), new TestNestedPojoClassB(2.3, "foo"))); + + backend.setCurrentKey(2); + state.update(new TestPojo("u2", 2, new TestNestedPojoClassA(2.0, 5), new TestNestedPojoClassB(3.1, "bar"))); + + KeyedStateHandle snapshot = runSnapshot(backend.snapshot( + 682375462378L, + 2, + streamFactory, + CheckpointOptions.forFullCheckpoint())); + + backend.dispose(); + + // ========== restore snapshot, with a different registration order in the configuration ========== + + env = new DummyEnvironment("test", 1, 0); + + env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class); // this time register B first + env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class); + + backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env); + + snapshot.discardState(); + + // re-initialize to ensure that we create the PojoSerializer from scratch, otherwise + // initializeSerializerUnlessSet would not pick up our new config + kvId = new ValueStateDescriptor<>("id", pojoType); + state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + backend.setCurrentKey(1); + + // update to test state backends that eagerly serialize, such as RocksDB + state.update(new TestPojo("u1", 11, new TestNestedPojoClassA(22.1, 12), new TestNestedPojoClassB(1.23, "foobar"))); + + // this tests backends that lazily serialize, such as memory state backend + runSnapshot(backend.snapshot( + 682375462378L, + 2, + streamFactory, + CheckpointOptions.forFullCheckpoint())); + + backend.dispose(); + } @Test @SuppressWarnings("unchecked") @@ -1696,8 +1830,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.value(); fail("should recognize wrong serializers"); - } catch (IOException e) { - if (!e.getMessage().contains("Trying to access state using wrong")) { + } catch (RuntimeException e) { + if (!e.getMessage().contains("State migration currently isn't supported")) { fail("wrong exception " + e); } // expected @@ -1747,8 +1881,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.get(); fail("should recognize wrong serializers"); - } catch (IOException e) { - if (!e.getMessage().contains("Trying to access state using wrong")) { + } catch (RuntimeException e) { + if (!e.getMessage().contains("State migration currently isn't supported")) { fail("wrong exception " + e); } // expected @@ -1800,8 +1934,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.get(); fail("should recognize wrong serializers"); - } catch (IOException e) { - if (!e.getMessage().contains("Trying to access state using wrong ")) { + } catch (RuntimeException e) { + if (!e.getMessage().contains("State migration currently isn't supported")) { fail("wrong exception " + e); } // expected @@ -1851,8 +1985,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.entries(); fail("should recognize wrong serializers"); - } catch (IOException e) { - if (!e.getMessage().contains("Trying to access state using wrong ")) { + } catch (RuntimeException e) { + if (!e.getMessage().contains("State migration currently isn't supported")) { fail("wrong exception " + e); } // expected @@ -2382,15 +2516,27 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten return snapshotRunnableFuture.get(); } - private static class TestPojo implements Serializable { + public static class TestPojo implements Serializable { private String strField; private Integer intField; + private TestNestedPojoClassA kryoClassAField; + private TestNestedPojoClassB kryoClassBField; + public TestPojo() {} public TestPojo(String strField, Integer intField) { this.strField = strField; this.intField = intField; + this.kryoClassAField = null; + this.kryoClassBField = null; + } + + public TestPojo(String strField, Integer intField, TestNestedPojoClassA classAField, TestNestedPojoClassB classBfield) { + this.strField = strField; + this.intField = intField; + this.kryoClassAField = classAField; + this.kryoClassBField = classBfield; } public String getStrField() { @@ -2409,6 +2555,22 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten this.intField = intField; } + public TestNestedPojoClassA getKryoClassAField() { + return kryoClassAField; + } + + public void setKryoClassAField(TestNestedPojoClassA kryoClassAField) { + this.kryoClassAField = kryoClassAField; + } + + public TestNestedPojoClassB getKryoClassBField() { + return kryoClassBField; + } + + public void setKryoClassBField(TestNestedPojoClassB kryoClassBField) { + this.kryoClassBField = kryoClassBField; + } + @Override public String toString() { return "TestPojo{" + @@ -2424,14 +2586,133 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten TestPojo testPojo = (TestPojo) o; - if (!strField.equals(testPojo.strField)) return false; - return intField.equals(testPojo.intField); + return strField.equals(testPojo.strField) + && intField.equals(testPojo.intField) + && ((kryoClassAField == null && testPojo.kryoClassAField == null) || kryoClassAField.equals(testPojo.kryoClassAField)) + && ((kryoClassBField == null && testPojo.kryoClassBField == null) || kryoClassBField.equals(testPojo.kryoClassBField)); } @Override public int hashCode() { int result = strField.hashCode(); result = 31 * result + intField.hashCode(); + + if (kryoClassAField != null) { + result = 31 * result + kryoClassAField.hashCode(); + } + + if (kryoClassBField != null) { + result = 31 * result + kryoClassBField.hashCode(); + } + + return result; + } + } + + public static class TestNestedPojoClassA implements Serializable { + private Double doubleField; + private Integer intField; + + public TestNestedPojoClassA() {} + + public TestNestedPojoClassA(Double doubleField, Integer intField) { + this.doubleField = doubleField; + this.intField = intField; + } + + public Double getDoubleField() { + return doubleField; + } + + public void setDoubleField(Double doubleField) { + this.doubleField = doubleField; + } + + public Integer getIntField() { + return intField; + } + + public void setIntField(Integer intField) { + this.intField = intField; + } + + @Override + public String toString() { + return "TestNestedPojoClassA{" + + "doubleField='" + doubleField + '\'' + + ", intField=" + intField + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TestNestedPojoClassA testNestedPojoClassA = (TestNestedPojoClassA) o; + + if (!doubleField.equals(testNestedPojoClassA.doubleField)) return false; + return intField.equals(testNestedPojoClassA.intField); + } + + @Override + public int hashCode() { + int result = doubleField.hashCode(); + result = 31 * result + intField.hashCode(); + return result; + } + } + + public static class TestNestedPojoClassB implements Serializable { + private Double doubleField; + private String strField; + + public TestNestedPojoClassB() {} + + public TestNestedPojoClassB(Double doubleField, String strField) { + this.doubleField = doubleField; + this.strField = strField; + } + + public Double getDoubleField() { + return doubleField; + } + + public void setDoubleField(Double doubleField) { + this.doubleField = doubleField; + } + + public String getStrField() { + return strField; + } + + public void setStrField(String strField) { + this.strField = strField; + } + + @Override + public String toString() { + return "TestNestedPojoClassB{" + + "doubleField='" + doubleField + '\'' + + ", strField=" + strField + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TestNestedPojoClassB testNestedPojoClassB = (TestNestedPojoClassB) o; + + if (!doubleField.equals(testNestedPojoClassB.doubleField)) return false; + return strField.equals(testNestedPojoClassB.strField); + } + + @Override + public int hashCode() { + int result = doubleField.hashCode(); + result = 31 * result + strField.hashCode(); return result; } }
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java index 976b9aa..8289821 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java @@ -19,7 +19,9 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; @@ -29,7 +31,7 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.state.ArrayListSerializer; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; +import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; import org.apache.flink.runtime.state.StateTransformationFunction; import org.apache.flink.util.TestLogger; import org.junit.Assert; @@ -50,8 +52,8 @@ public class CopyOnWriteStateTableTest extends TestLogger { */ @Test public void testPutGetRemoveContainsTransform() throws Exception { - RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo = - new RegisteredBackendStateMetaInfo<>( + RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( StateDescriptor.Type.UNKNOWN, "test", IntSerializer.INSTANCE, @@ -122,8 +124,8 @@ public class CopyOnWriteStateTableTest extends TestLogger { */ @Test public void testIncrementalRehash() { - RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo = - new RegisteredBackendStateMetaInfo<>( + RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( StateDescriptor.Type.UNKNOWN, "test", IntSerializer.INSTANCE, @@ -167,8 +169,8 @@ public class CopyOnWriteStateTableTest extends TestLogger { @Test public void testRandomModificationsAndCopyOnWriteIsolation() throws Exception { - final RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo = - new RegisteredBackendStateMetaInfo<>( + final RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( StateDescriptor.Type.UNKNOWN, "test", IntSerializer.INSTANCE, @@ -322,8 +324,8 @@ public class CopyOnWriteStateTableTest extends TestLogger { */ @Test public void testCopyOnWriteContracts() { - RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo = - new RegisteredBackendStateMetaInfo<>( + RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( StateDescriptor.Type.UNKNOWN, "test", IntSerializer.INSTANCE, @@ -397,8 +399,8 @@ public class CopyOnWriteStateTableTest extends TestLogger { final TestDuplicateSerializer stateSerializer = new TestDuplicateSerializer();; final TestDuplicateSerializer keySerializer = new TestDuplicateSerializer();; - RegisteredBackendStateMetaInfo<Integer, Integer> metaInfo = - new RegisteredBackendStateMetaInfo<>( + RegisteredKeyedBackendStateMetaInfo<Integer, Integer> metaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( StateDescriptor.Type.VALUE, "test", namespaceSerializer, @@ -649,5 +651,15 @@ public class CopyOnWriteStateTableTest extends TestLogger { public void disable() { this.disabled = true; } + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + throw new UnsupportedOperationException(); + } + + @Override + public CompatibilityResult<Integer> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + throw new UnsupportedOperationException(); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java index 6fd94f7..85bc177 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java @@ -27,7 +27,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.state.ArrayListSerializer; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; -import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; +import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; import org.junit.Assert; import org.junit.Test; @@ -44,8 +44,8 @@ public class StateTableSnapshotCompatibilityTest { @Test public void checkCompatibleSerializationFormats() throws IOException { final Random r = new Random(42); - RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo = - new RegisteredBackendStateMetaInfo<>( + RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( StateDescriptor.Type.UNKNOWN, "test", IntSerializer.INSTANCE, http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java index 146ccd0..ce23f30 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java @@ -21,7 +21,9 @@ package org.apache.flink.runtime.testutils.recordutils; import java.io.IOException; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.types.Record; @@ -141,4 +143,14 @@ public final class RecordSerializer extends TypeSerializer<Record> { public int hashCode() { return RecordSerializer.class.hashCode(); } + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + throw new UnsupportedOperationException(); + } + + @Override + public CompatibilityResult<Record> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala index 6871159..468fddc 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala @@ -18,8 +18,9 @@ package org.apache.flink.api.scala.typeutils import org.apache.flink.annotation.Internal -import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.core.memory.{DataOutputView, DataInputView} +import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerializer, TypeSerializerConfigSnapshot} +import org.apache.flink.api.java.typeutils.runtime.EitherSerializerConfigSnapshot +import org.apache.flink.core.memory.{DataInputView, DataOutputView} /** * Serializer for [[Either]]. @@ -104,4 +105,47 @@ class EitherSerializer[A, B, T <: Either[A, B]]( override def hashCode(): Int = { 31 * leftSerializer.hashCode() + rightSerializer.hashCode() } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + override def snapshotConfiguration(): EitherSerializerConfigSnapshot = { + new EitherSerializerConfigSnapshot( + leftSerializer.snapshotConfiguration(), + rightSerializer.snapshotConfiguration()) + } + + override def ensureCompatibility( + configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[T] = { + + configSnapshot match { + case eitherSerializerConfig: EitherSerializerConfigSnapshot => + val leftRightConfigs = + eitherSerializerConfig.getNestedSerializerConfigSnapshots + + val leftCompatResult = leftSerializer.ensureCompatibility(leftRightConfigs(0)) + val rightCompatResult = rightSerializer.ensureCompatibility(leftRightConfigs(1)) + + if (leftCompatResult.requiresMigration || rightCompatResult.requiresMigration) { + if (leftCompatResult.getConvertDeserializer != null + && rightCompatResult.getConvertDeserializer != null) { + + CompatibilityResult.requiresMigration( + new EitherSerializer[A, B, T]( + leftCompatResult.getConvertDeserializer, + rightCompatResult.getConvertDeserializer + ) + ) + + } else { + CompatibilityResult.requiresMigration(null) + } + } else { + CompatibilityResult.compatible() + } + + case _ => CompatibilityResult.requiresMigration(null) + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala index 67c1445..dc96c98 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala @@ -17,10 +17,14 @@ */ package org.apache.flink.api.scala.typeutils +import java.io.IOException + import org.apache.flink.annotation.Internal -import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerializer, TypeSerializerConfigSnapshot} import org.apache.flink.api.common.typeutils.base.IntSerializer +import org.apache.flink.api.java.typeutils.runtime.{DataInputViewStream, DataOutputViewStream} import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.util.InstantiationUtil /** * Serializer for [[Enumeration]] values. @@ -67,4 +71,114 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) extends TypeSerializer[ override def canEqual(obj: scala.Any): Boolean = { obj.isInstanceOf[EnumValueSerializer[_]] } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + override def snapshotConfiguration(): EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E] = { + new EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E]( + enum.getClass.asInstanceOf[Class[E]]) + } + + override def ensureCompatibility( + configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[E#Value] = { + + configSnapshot match { + case enumSerializerConfigSnapshot: EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[_] => + val enumClass = enum.getClass.asInstanceOf[Class[E]] + if (enumClass.equals(enumSerializerConfigSnapshot.getEnumClass)) { + val currentEnumConstants = enumSerializerConfigSnapshot.getEnumClass.getEnumConstants + + for ( i <- 0 to currentEnumConstants.length) { + // compatible only if new enum constants are only appended, + // and original constants must be in the exact same order + + if (currentEnumConstants(i) != enumSerializerConfigSnapshot.getEnumConstants(i)) { + CompatibilityResult.requiresMigration(null) + } + } + + CompatibilityResult.compatible() + } else { + CompatibilityResult.requiresMigration(null) + } + + case _ => CompatibilityResult.requiresMigration(null) + } + } +} + +object EnumValueSerializer { + + class ScalaEnumSerializerConfigSnapshot[E <: Enumeration](private var enumClass: Class[E]) + extends TypeSerializerConfigSnapshot { + + var enumConstants: Array[E] = enumClass.getEnumConstants + + /** This empty nullary constructor is required for deserializing the configuration. */ + def this() = this(null) + + override def write(out: DataOutputView): Unit = { + super.write(out) + + try { + val outViewWrapper = new DataOutputViewStream(out) + try { + InstantiationUtil.serializeObject(outViewWrapper, enumClass) + InstantiationUtil.serializeObject(outViewWrapper, enumConstants) + } finally if (outViewWrapper != null) outViewWrapper.close() + } + } + + override def read(in: DataInputView): Unit = { + super.read(in) + + try { + val inViewWrapper = new DataInputViewStream(in) + try + try { + enumClass = InstantiationUtil.deserializeObject( + inViewWrapper, getUserCodeClassLoader) + + enumConstants = InstantiationUtil.deserializeObject( + inViewWrapper, getUserCodeClassLoader) + } catch { + case e: ClassNotFoundException => + throw new IOException("The requested enum class cannot be found in classpath.", e) + } + finally if (inViewWrapper != null) inViewWrapper.close() + } + } + + override def getVersion: Int = ScalaEnumSerializerConfigSnapshot.VERSION + + def getEnumClass: Class[E] = enumClass + + def getEnumConstants: Array[E] = enumConstants + + override def equals(obj: scala.Any): Boolean = { + if (obj == this) { + return true + } + + if (obj == null) { + return false + } + + obj.isInstanceOf[ScalaEnumSerializerConfigSnapshot[E]] && + enumClass.equals(obj.asInstanceOf[ScalaEnumSerializerConfigSnapshot[E]].enumClass) && + enumConstants.sameElements( + obj.asInstanceOf[ScalaEnumSerializerConfigSnapshot[E]].enumConstants) + } + + override def hashCode(): Int = { + enumClass.hashCode() * 31 + enumConstants.toSeq.hashCode() + } + } + + object ScalaEnumSerializerConfigSnapshot { + val VERSION = 1 + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala index fa5279e..01ca295 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala @@ -18,8 +18,8 @@ package org.apache.flink.api.scala.typeutils import org.apache.flink.annotation.Internal -import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.core.memory.{DataOutputView, DataInputView} +import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerializer, TypeSerializerConfigSnapshot} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} /** * Serializer for cases where no serializer is required but the system still expects one. This @@ -50,13 +50,19 @@ class NothingSerializer extends TypeSerializer[Any] { override def serialize(any: Any, target: DataOutputView): Unit = throw new RuntimeException("This must not be used. You encountered a bug.") - override def deserialize(source: DataInputView): Any = throw new RuntimeException("This must not be used. You encountered a bug.") override def deserialize(reuse: Any, source: DataInputView): Any = throw new RuntimeException("This must not be used. You encountered a bug.") + override def snapshotConfiguration(): TypeSerializerConfigSnapshot = + throw new RuntimeException("This must not be used. You encountered a bug.") + + override def ensureCompatibility( + configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[Any] = + throw new RuntimeException("This must not be used. You encountered a bug.") + override def equals(obj: Any): Boolean = { obj match { case nothingSerializer: NothingSerializer => nothingSerializer.canEqual(this) http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala index a8b3a56..d2bb098 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala @@ -18,8 +18,8 @@ package org.apache.flink.api.scala.typeutils import org.apache.flink.annotation.Internal -import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.core.memory.{DataOutputView, DataInputView} +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.core.memory.{DataInputView, DataOutputView} /** * Serializer for [[Option]]. @@ -95,4 +95,52 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A]) override def hashCode(): Int = { elemSerializer.hashCode() } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + override def snapshotConfiguration(): OptionSerializer.OptionSerializerConfigSnapshot = { + new OptionSerializer.OptionSerializerConfigSnapshot(elemSerializer.snapshotConfiguration()) + } + + override def ensureCompatibility( + configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[Option[A]] = { + configSnapshot match { + case optionSerializerConfigSnapshot: OptionSerializer.OptionSerializerConfigSnapshot => + val compatResult = elemSerializer.ensureCompatibility( + optionSerializerConfigSnapshot.getSingleNestedSerializerConfigSnapshot) + + if (compatResult.requiresMigration()) { + if (compatResult.getConvertDeserializer != null) { + CompatibilityResult.requiresMigration( + new OptionSerializer[A](compatResult.getConvertDeserializer)) + } else { + CompatibilityResult.requiresMigration(null) + } + } else { + CompatibilityResult.compatible() + } + + case _ => CompatibilityResult.requiresMigration(null) + } + } +} + +object OptionSerializer { + + class OptionSerializerConfigSnapshot( + private var elemSerializerConfigSnapshot: TypeSerializerConfigSnapshot) + extends CompositeTypeSerializerConfigSnapshot(elemSerializerConfigSnapshot) { + + /** This empty nullary constructor is required for deserializing the configuration. */ + def this() = this(null) + + override def getVersion: Int = OptionSerializerConfigSnapshot.VERSION + } + + object OptionSerializerConfigSnapshot { + val VERSION = 1 + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala index d1b9085..1ac46f9 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala @@ -20,8 +20,8 @@ package org.apache.flink.api.scala.typeutils import java.io.ObjectInputStream import org.apache.flink.annotation.Internal -import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.core.memory.{DataOutputView, DataInputView} +import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerializer, TypeSerializerConfigSnapshot} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} import scala.collection.generic.CanBuildFrom @@ -150,4 +150,13 @@ abstract class TraversableSerializer[T <: TraversableOnce[E], E]( override def canEqual(obj: Any): Boolean = { obj.isInstanceOf[TraversableSerializer[_, _]] } + + override def snapshotConfiguration(): TypeSerializerConfigSnapshot = { + throw new UnsupportedOperationException() + } + + override def ensureCompatibility( + configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[T] = { + throw new UnsupportedOperationException() + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala index a5ec03a..c864dc7 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala @@ -19,11 +19,12 @@ package org.apache.flink.api.scala.typeutils import org.apache.flink.annotation.Internal import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils._ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.KryoSerializerConfigSnapshot import org.apache.flink.core.memory.{DataInputView, DataOutputView} -import scala.util.{Success, Try, Failure} +import scala.util.{Failure, Success, Try} /** * Serializer for [[scala.util.Try]]. @@ -98,4 +99,57 @@ class TrySerializer[A]( override def hashCode(): Int = { 31 * elemSerializer.hashCode() + executionConfig.hashCode() } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + override def snapshotConfiguration(): TypeSerializerConfigSnapshot = { + new TrySerializer.TrySerializerConfigSnapshot( + elemSerializer.snapshotConfiguration(), + throwableSerializer.snapshotConfiguration()) + } + + override def ensureCompatibility( + configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[Try[A]] = { + + configSnapshot match { + case trySerializerConfigSnapshot: TrySerializer.TrySerializerConfigSnapshot => + val serializerConfigSnapshots = + trySerializerConfigSnapshot.getNestedSerializerConfigSnapshots + + val elemCompatRes = + elemSerializer.ensureCompatibility(serializerConfigSnapshots(0)) + val throwableCompatRes = + throwableSerializer.ensureCompatibility(serializerConfigSnapshots(1)) + + if (elemCompatRes.requiresMigration() || throwableCompatRes.requiresMigration()) { + CompatibilityResult.requiresMigration(null) + } else { + CompatibilityResult.compatible() + } + + case _ => CompatibilityResult.requiresMigration(null) + } + } +} + +object TrySerializer { + + class TrySerializerConfigSnapshot( + private var elemSerializerConfigSnapshot: TypeSerializerConfigSnapshot, + private var throwableSerializerConfigSnapshot: KryoSerializerConfigSnapshot[Throwable]) + extends CompositeTypeSerializerConfigSnapshot( + elemSerializerConfigSnapshot, throwableSerializerConfigSnapshot) { + + /** This empty nullary constructor is required for deserializing the configuration. */ + def this() = this(null, null) + + override def getVersion: Int = TrySerializerConfigSnapshot.VERSION + } + + object TrySerializerConfigSnapshot { + val VERSION = 1 + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java index fdcd5b8..53fea46 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java @@ -21,7 +21,11 @@ package org.apache.flink.migration.streaming.runtime.streamrecord; import static java.util.Objects.requireNonNull; import java.io.IOException; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.streaming.api.watermark.Watermark; @@ -205,6 +209,53 @@ public class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<Stream } } + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + @Override + public MultiplexingStreamRecordSerializerConfigSnapshot snapshotConfiguration() { + return new MultiplexingStreamRecordSerializerConfigSnapshot(typeSerializer.snapshotConfiguration()); + } + + @Override + public CompatibilityResult<StreamElement> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof MultiplexingStreamRecordSerializerConfigSnapshot) { + CompatibilityResult<T> compatResult = typeSerializer.ensureCompatibility( + ((MultiplexingStreamRecordSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot()); + + if (!compatResult.requiresMigration()) { + return CompatibilityResult.compatible(); + } else if (compatResult.getConvertDeserializer() != null) { + return CompatibilityResult.requiresMigration( + new MultiplexingStreamRecordSerializer<>(compatResult.getConvertDeserializer())); + } + } + + return CompatibilityResult.requiresMigration(null); + } + + /** + * Configuration snapshot specific to the {@link MultiplexingStreamRecordSerializer}. + */ + public static final class MultiplexingStreamRecordSerializerConfigSnapshot + extends CompositeTypeSerializerConfigSnapshot { + + private static final int VERSION = 1; + + /** This empty nullary constructor is required for deserializing the configuration. */ + public MultiplexingStreamRecordSerializerConfigSnapshot() {} + + public MultiplexingStreamRecordSerializerConfigSnapshot(TypeSerializerConfigSnapshot typeSerializerConfigSnapshot) { + super(typeSerializerConfigSnapshot); + } + + @Override + public int getVersion() { + return VERSION; + } + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java index 2c8dc4a..2a87f4e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java @@ -20,7 +20,10 @@ package org.apache.flink.migration.streaming.runtime.streamrecord; import java.io.IOException; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -144,4 +147,50 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord public int hashCode() { return typeSerializer.hashCode(); } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + @Override + public StreamRecordSerializerConfigSnapshot snapshotConfiguration() { + return new StreamRecordSerializerConfigSnapshot(typeSerializer.snapshotConfiguration()); + } + + @Override + public CompatibilityResult<StreamRecord<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof StreamRecordSerializerConfigSnapshot) { + CompatibilityResult<T> compatResult = typeSerializer.ensureCompatibility( + ((StreamRecordSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot()); + + if (!compatResult.requiresMigration()) { + return CompatibilityResult.requiresMigration(null); + } else if (compatResult.getConvertDeserializer() != null) { + return CompatibilityResult.requiresMigration( + new StreamRecordSerializer<>(compatResult.getConvertDeserializer())); + } + } + + return CompatibilityResult.requiresMigration(null); + } + + /** + * Configuration snapshot specific to the {@link StreamRecordSerializer}. + */ + public static final class StreamRecordSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + + private static final int VERSION = 1; + + /** This empty nullary constructor is required for deserializing the configuration. */ + public StreamRecordSerializerConfigSnapshot() {} + + public StreamRecordSerializerConfigSnapshot(TypeSerializerConfigSnapshot typeSerializerConfigSnapshot) { + super(typeSerializerConfigSnapshot); + } + + @Override + public int getVersion() { + return VERSION; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java index 13a8a24..f0c3dc2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java @@ -29,7 +29,9 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -547,6 +549,16 @@ public class CoGroupedStreams<T1, T2> { public boolean canEqual(Object obj) { return obj instanceof UnionSerializer; } + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + throw new UnsupportedOperationException("This serializer is not registered for managed state."); + } + + @Override + public CompatibilityResult<TaggedUnion<T1, T2>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + throw new UnsupportedOperationException("This serializer is not registered for managed state."); + } } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java index 7b6ba8d..1455712 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java @@ -19,7 +19,9 @@ package org.apache.flink.streaming.api.operators; import java.io.IOException; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -184,5 +186,15 @@ public class InternalTimer<K, N> implements Comparable<InternalTimer<K, N>> { public int hashCode() { return getClass().hashCode(); } + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + throw new UnsupportedOperationException("This serializer is not registered for managed state."); + } + + @Override + public CompatibilityResult<InternalTimer<K, N>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + throw new UnsupportedOperationException("This serializer is not registered for managed state."); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java index b1e06f5..cf5c74c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.windowing.windows; import java.io.IOException; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -61,7 +62,7 @@ public class GlobalWindow extends Window { /** * A {@link TypeSerializer} for {@link GlobalWindow}. */ - public static class Serializer extends TypeSerializer<GlobalWindow> { + public static class Serializer extends TypeSerializerSingleton<GlobalWindow> { private static final long serialVersionUID = 1L; @Override @@ -70,11 +71,6 @@ public class GlobalWindow extends Window { } @Override - public TypeSerializer<GlobalWindow> duplicate() { - return this; - } - - @Override public GlobalWindow createInstance() { return GlobalWindow.INSTANCE; } @@ -119,18 +115,8 @@ public class GlobalWindow extends Window { } @Override - public boolean equals(Object obj) { - return obj instanceof Serializer; - } - - @Override public boolean canEqual(Object obj) { return obj instanceof Serializer; } - - @Override - public int hashCode() { - return 0; - } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java index 6d896cb..a7ea244 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java @@ -26,7 +26,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -130,7 +130,7 @@ public class TimeWindow extends Window { /** * The serializer used to write the TimeWindow type. */ - public static class Serializer extends TypeSerializer<TimeWindow> { + public static class Serializer extends TypeSerializerSingleton<TimeWindow> { private static final long serialVersionUID = 1L; @Override @@ -139,11 +139,6 @@ public class TimeWindow extends Window { } @Override - public TypeSerializer<TimeWindow> duplicate() { - return this; - } - - @Override public TimeWindow createInstance() { return null; } @@ -188,19 +183,9 @@ public class TimeWindow extends Window { } @Override - public boolean equals(Object obj) { - return obj instanceof Serializer; - } - - @Override public boolean canEqual(Object obj) { return obj instanceof Serializer; } - - @Override - public int hashCode() { - return 0; - } } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java index 7fe088a..5c52fa6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java @@ -22,7 +22,10 @@ import static java.util.Objects.requireNonNull; import java.io.IOException; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.streaming.api.watermark.Watermark; @@ -262,4 +265,54 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme public int hashCode() { return typeSerializer.hashCode(); } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // + // This serializer may be used by Flink internal operators that need to checkpoint + // buffered records. Therefore, it may be part of managed state and need to implement + // the configuration snapshot and compatibility methods. + // -------------------------------------------------------------------------------------------- + + @Override + public StreamElementSerializerConfigSnapshot snapshotConfiguration() { + return new StreamElementSerializerConfigSnapshot(typeSerializer.snapshotConfiguration()); + } + + @Override + public CompatibilityResult<StreamElement> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof StreamElementSerializerConfigSnapshot) { + CompatibilityResult<T> compatResult = typeSerializer.ensureCompatibility( + ((StreamElementSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot()); + + if (!compatResult.requiresMigration()) { + return CompatibilityResult.compatible(); + } else if (compatResult.getConvertDeserializer() != null) { + return CompatibilityResult.requiresMigration( + new StreamElementSerializer<>(compatResult.getConvertDeserializer())); + } + } + + return CompatibilityResult.requiresMigration(null); + } + + /** + * Configuration snapshot specific to the {@link StreamElementSerializer}. + */ + public static final class StreamElementSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + + private static final int VERSION = 1; + + /** This empty nullary constructor is required for deserializing the configuration. */ + public StreamElementSerializerConfigSnapshot() {} + + public StreamElementSerializerConfigSnapshot(TypeSerializerConfigSnapshot typeSerializerConfigSnapshot) { + super(typeSerializerConfigSnapshot); + } + + @Override + public int getVersion() { + return VERSION; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java index a24a3a8..2693bc1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java @@ -228,6 +228,5 @@ public class CheckpointingCustomKvStateProgram { public boolean canEqual(Object obj) { return obj instanceof CustomIntSerializer; } - } }