This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2946bbeb2799ba0ed4a4417738d3000ab4f0b6ea Author: Hangxiang Yu <master...@gmail.com> AuthorDate: Tue Jan 24 10:39:00 2023 +0800 [FLINK-30613][serializer] Migrate ScalaEnumSerializerSnapshot to implement new method of resolving schema compatibility --- .../typeutils/ScalaEnumSerializerSnapshot.scala | 33 +++++++++++----------- .../EnumValueSerializerCompatibilityTest.scala | 2 +- .../scala/typeutils/EnumValueSerializerTest.scala | 6 +++- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaEnumSerializerSnapshot.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaEnumSerializerSnapshot.scala index f3742a0c841..2cac0129598 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaEnumSerializerSnapshot.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaEnumSerializerSnapshot.scala @@ -26,30 +26,30 @@ import scala.collection.mutable.ListBuffer class ScalaEnumSerializerSnapshot[E <: Enumeration] extends TypeSerializerSnapshot[E#Value] { var enumClass: Class[E] = _ - var previousEnumConstants: List[(String, Int)] = _ + var enumConstants: List[(String, Int)] = _ def this(enum: E) = { this() this.enumClass = Preconditions.checkNotNull(enum).getClass.asInstanceOf[Class[E]] - this.previousEnumConstants = enum.values.toList.map(x => (x.toString, x.id)) + this.enumConstants = enum.values.toList.map(x => (x.toString, x.id)) } def this(enumClass: Class[E], previousEnumConstants: List[(String, Int)]) = { this() this.enumClass = Preconditions.checkNotNull(enumClass) - this.previousEnumConstants = Preconditions.checkNotNull(previousEnumConstants) + this.enumConstants = Preconditions.checkNotNull(previousEnumConstants) } override def getCurrentVersion: Int = ScalaEnumSerializerSnapshot.VERSION override def writeSnapshot(out: DataOutputView): Unit = { Preconditions.checkState(enumClass != null) - Preconditions.checkState(previousEnumConstants != null) + Preconditions.checkState(enumConstants != null) out.writeUTF(enumClass.getName) - out.writeInt(previousEnumConstants.length) - for ((name, idx) <- previousEnumConstants) { + out.writeInt(enumConstants.length) + for ((name, idx) <- enumConstants) { out.writeUTF(name) out.writeInt(idx) } @@ -71,7 +71,7 @@ class ScalaEnumSerializerSnapshot[E <: Enumeration] extends TypeSerializerSnapsh listBuffer += ((name, idx)) } - previousEnumConstants = listBuffer.toList + enumConstants = listBuffer.toList } override def restoreSerializer(): TypeSerializer[E#Value] = { @@ -80,25 +80,26 @@ class ScalaEnumSerializerSnapshot[E <: Enumeration] extends TypeSerializerSnapsh new EnumValueSerializer(enumObject) } - override def resolveSchemaCompatibility( - newSerializer: TypeSerializer[E#Value]): TypeSerializerSchemaCompatibility[E#Value] = { + override def resolveSchemaCompatibility(oldSerializerSnapshot: TypeSerializerSnapshot[E#Value]) + : TypeSerializerSchemaCompatibility[E#Value] = { Preconditions.checkState(enumClass != null) - Preconditions.checkState(previousEnumConstants != null) + Preconditions.checkState(enumConstants != null) - if (!newSerializer.isInstanceOf[EnumValueSerializer[E]]) { + if (!oldSerializerSnapshot.isInstanceOf[ScalaEnumSerializerSnapshot[E]]) { return TypeSerializerSchemaCompatibility.incompatible() } - val newEnumSerializer = newSerializer.asInstanceOf[EnumValueSerializer[E]] - if (!enumClass.equals(newEnumSerializer.enum.getClass)) { + val oldEnumSerializerSnapshot = + oldSerializerSnapshot.asInstanceOf[ScalaEnumSerializerSnapshot[E]] + if (!enumClass.equals(oldEnumSerializerSnapshot.enumClass)) { return TypeSerializerSchemaCompatibility.incompatible() } - for ((previousEnumName, index) <- previousEnumConstants) { + for ((oldEnumName, index) <- oldEnumSerializerSnapshot.enumConstants) { try { - val newEnumName = newEnumSerializer.enum(index).toString - if (previousEnumName != newEnumName) { + val enumName = enumConstants(index)._1 + if (enumName != oldEnumName) { return TypeSerializerSchemaCompatibility.incompatible() } } catch { diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala index 228a816368d..c5e4b329735 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala @@ -141,7 +141,7 @@ class EnumValueSerializerCompatibilityTest { val enum2 = instantiateEnum[Enumeration](classLoader2, enumName) val enumValueSerializer2 = new EnumValueSerializer(enum2) - snapshot2.resolveSchemaCompatibility(enumValueSerializer2) + enumValueSerializer2.snapshotConfiguration().resolveSchemaCompatibility(snapshot2) } } diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala index 9a9faf9c369..a564eac50c9 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala @@ -32,7 +32,11 @@ class EnumValueSerializerTest { val snapshot = enumSerializer.snapshotConfiguration() - assertThat(snapshot.resolveSchemaCompatibility(enumSerializer).isCompatibleAsIs).isTrue + assertThat( + enumSerializer + .snapshotConfiguration() + .resolveSchemaCompatibility(snapshot) + .isCompatibleAsIs).isTrue } }