This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 14aae5937e27a9df806dc6cce9bcb88c9188802b Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> AuthorDate: Wed Feb 27 12:10:08 2019 +0800 [FLINK-11772] [DataStream] Let InternalTimerServiceImpl use new serialization compatibility APIs for key / namespace serializer checks This commit lets the InternalTimerServiceImpl properly use TypeSerializerSchemaCompatibility / TypeSerializerSnapshot#resolveSchemaCompatibility when attempting to check the compatibility of new key and namespace serializers. This also fixes the fact that this check was previously broken, in that the key / namespace serializer was not reassigned to be reconfigured ones. --- .../api/operators/InternalTimerServiceImpl.java | 42 ++++++++++++---------- .../InternalTimersSnapshotReaderWriters.java | 10 ++++-- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java index c2088d0..a7a3490 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java @@ -19,9 +19,8 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.runtime.state.InternalPriorityQueue; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; @@ -142,26 +141,31 @@ public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N>, // the following is the case where we restore if (restoredTimersSnapshot != null) { - CompatibilityResult<K> keySerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult( - this.keyDeserializer, - null, - restoredTimersSnapshot.getKeySerializerSnapshot(), - keySerializer); - - CompatibilityResult<N> namespaceSerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult( - this.namespaceDeserializer, - null, - restoredTimersSnapshot.getNamespaceSerializerSnapshot(), - namespaceSerializer); - - if (keySerializerCompatibility.isRequiresMigration() || namespaceSerializerCompatibility.isRequiresMigration()) { - throw new IllegalStateException("Tried to initialize restored TimerService " + - "with incompatible serializers than those used to snapshot its state."); + TypeSerializerSchemaCompatibility<K> keySerializerCompatibility = + restoredTimersSnapshot.getKeySerializerSnapshot().resolveSchemaCompatibility(keySerializer); + + if (keySerializerCompatibility.isIncompatible() || keySerializerCompatibility.isCompatibleAfterMigration()) { + throw new IllegalStateException( + "Tried to initialize restored TimerService with new key serializer that requires migration or is incompatible."); } + + TypeSerializerSchemaCompatibility<N> namespaceSerializerCompatibility = + restoredTimersSnapshot.getNamespaceSerializerSnapshot().resolveSchemaCompatibility(namespaceSerializer); + + if (namespaceSerializerCompatibility.isIncompatible() || namespaceSerializerCompatibility.isCompatibleAfterMigration()) { + throw new IllegalStateException( + "Tried to initialize restored TimerService with new namespace serializer that requires migration or is incompatible."); + } + + this.keySerializer = keySerializerCompatibility.isCompatibleAsIs() + ? keySerializer : keySerializerCompatibility.getReconfiguredSerializer(); + this.namespaceSerializer = namespaceSerializerCompatibility.isCompatibleAsIs() + ? namespaceSerializer : namespaceSerializerCompatibility.getReconfiguredSerializer(); + } else { + this.keySerializer = keySerializer; + this.namespaceSerializer = namespaceSerializer; } - this.keySerializer = keySerializer; - this.namespaceSerializer = namespaceSerializer; this.keyDeserializer = null; this.namespaceDeserializer = null; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java index a937a5c..d61e542 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.BackwardsCompatibleSerializerSnapshot; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; @@ -265,8 +266,13 @@ public class InternalTimersSnapshotReaderWriters { DataInputViewStream dis = new DataInputViewStream(in); try { - restoredTimersSnapshot.setKeySerializer(InstantiationUtil.deserializeObject(dis, userCodeClassLoader, true)); - restoredTimersSnapshot.setNamespaceSerializer(InstantiationUtil.deserializeObject(dis, userCodeClassLoader, true)); + final TypeSerializer<K> keySerializer = InstantiationUtil.deserializeObject(dis, userCodeClassLoader, true); + final TypeSerializer<N> namespaceSerializer = InstantiationUtil.deserializeObject(dis, userCodeClassLoader, true); + + restoredTimersSnapshot.setKeySerializer(keySerializer); + restoredTimersSnapshot.setKeySerializerSnapshot(new BackwardsCompatibleSerializerSnapshot<>(keySerializer)); + restoredTimersSnapshot.setNamespaceSerializer(namespaceSerializer); + restoredTimersSnapshot.setNamespaceSerializerSnapshot(new BackwardsCompatibleSerializerSnapshot<>(namespaceSerializer)); } catch (ClassNotFoundException exception) { throw new IOException(exception); }