Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3834#discussion_r115139339 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1473,22 +1481,92 @@ void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception protected <N, S> ColumnFamilyHandle getColumnFamily( StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException { - Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> stateInfo = + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo = kvStateInformation.get(descriptor.getName()); - RegisteredBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredBackendStateMetaInfo<>( - descriptor.getType(), - descriptor.getName(), - namespaceSerializer, - descriptor.getSerializer()); + RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( + descriptor.getType(), + descriptor.getName(), + namespaceSerializer, + descriptor.getSerializer()); if (stateInfo != null) { - if (newMetaInfo.canRestoreFrom(stateInfo.f1)) { + // TODO with eager registration in place, these checks should be moved to restore() + + RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo = + restoredKvStateMetaInfos.get(descriptor.getName()); + + Preconditions.checkState( + newMetaInfo.getName().equals(restoredMetaInfo.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfo.getName() + "], " + + "registered with [" + newMetaInfo.getName() + "]."); + + if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN) + && !restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()), + "Incompatible state types. " + + "Was [" + restoredMetaInfo.getStateType() + "], " + + "registered with [" + newMetaInfo.getStateType() + "]."); + } + + // check serializer migration strategies to determine if state migration is required + + boolean requireMigration = false; + + // only check migration strategy if there is a restored configuration snapshot; + // there wouldn't be one if we were restored from an older version checkpoint, + // in which case we can only simply assume that migration is not required + + if (restoredMetaInfo.getNamespaceSerializerConfigSnapshot() != null) { + MigrationStrategy<N> namespaceMigrationStrategy = newMetaInfo.getNamespaceSerializer() + .getMigrationStrategyFor(restoredMetaInfo.getNamespaceSerializerConfigSnapshot()); + + TypeSerializer<N> finalOldNamespaceSerializer; + if (namespaceMigrationStrategy.requireMigration()) { + requireMigration = true; + + if (namespaceMigrationStrategy.getFallbackDeserializer() != null) { + finalOldNamespaceSerializer = namespaceMigrationStrategy.getFallbackDeserializer(); + } else if (restoredMetaInfo.getNamespaceSerializer() != null + && !(restoredMetaInfo.getNamespaceSerializer() instanceof MigrationNamespaceSerializerProxy)) { + finalOldNamespaceSerializer = restoredMetaInfo.getNamespaceSerializer(); + } else { + throw new RuntimeException( + "State migration required, but there is no available serializer capable of reading previous namespace."); + } + } + } + + if (restoredMetaInfo.getStateSerializerConfigSnapshot() != null) { + MigrationStrategy<S> stateMigrationStrategy = newMetaInfo.getStateSerializer() + .getMigrationStrategyFor(restoredMetaInfo.getStateSerializerConfigSnapshot()); + + TypeSerializer<S> finalOldStateSerializer; + if (stateMigrationStrategy.requireMigration()) { + requireMigration = true; + + if (stateMigrationStrategy.getFallbackDeserializer() != null) { --- End diff -- This whole `if` is interesting: basically you give the `FallbackDeserializer` priority over the actual old serializer that we could get via Java serialization. Originally, I thought the intended flow is: check compatibility between by confronting the user provided serializer with the stored the config. In case they we need to convert, first try to load the former serializer through Java serialization (because this is a safe bet that this class can read the old state if we succeed). If Java deserialization fails, we use the `FallbackSerializer` provided by the new serializer (this should also be a safe bet, except if the implementation is wrong, so overall slightly less save). Now here is there question: I think in the serializer you combine the compatibility check and potential creation of the `FallbackSerializer` in a single method, because both methods would partially do similar and duplicated work. The downside now is, given the indented flow, we only need to create and use a `FallbackSerializer` if we cannot load the old serializer through Java deserialization. I can see that this flow could also work, or we can still use the flow that prioritizes Java serialization and potentially creates an unused `FallbackSerializer`. What do you think and is there some point I did not consider that lead to this choice?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---