Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3834#discussion_r115139339
  
    --- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -1473,22 +1481,92 @@ void restore(Collection<KeyedStateHandle> 
restoreStateHandles) throws Exception
        protected <N, S> ColumnFamilyHandle getColumnFamily(
                        StateDescriptor<?, S> descriptor, TypeSerializer<N> 
namespaceSerializer) throws IOException {
     
    -           Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, 
?>> stateInfo =
    +           Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
                                kvStateInformation.get(descriptor.getName());
     
    -           RegisteredBackendStateMetaInfo<N, S> newMetaInfo = new 
RegisteredBackendStateMetaInfo<>(
    -                           descriptor.getType(),
    -                           descriptor.getName(),
    -                           namespaceSerializer,
    -                           descriptor.getSerializer());
    +           RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
    +                   descriptor.getType(),
    +                   descriptor.getName(),
    +                   namespaceSerializer,
    +                   descriptor.getSerializer());
     
                if (stateInfo != null) {
    -                   if (newMetaInfo.canRestoreFrom(stateInfo.f1)) {
    +                   // TODO with eager registration in place, these checks 
should be moved to restore()
    +
    +                   RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> 
restoredMetaInfo =
    +                           
restoredKvStateMetaInfos.get(descriptor.getName());
    +
    +                   Preconditions.checkState(
    +                           
newMetaInfo.getName().equals(restoredMetaInfo.getName()),
    +                           "Incompatible state names. " +
    +                                   "Was [" + restoredMetaInfo.getName() + 
"], " +
    +                                   "registered with [" + 
newMetaInfo.getName() + "].");
    +
    +                   if 
(!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
    +                           && 
!restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
    +
    +                           Preconditions.checkState(
    +                                   
newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()),
    +                                   "Incompatible state types. " +
    +                                           "Was [" + 
restoredMetaInfo.getStateType() + "], " +
    +                                           "registered with [" + 
newMetaInfo.getStateType() + "].");
    +                   }
    +
    +                   // check serializer migration strategies to determine 
if state migration is required
    +
    +                   boolean requireMigration = false;
    +
    +                   // only check migration strategy if there is a restored 
configuration snapshot;
    +                   // there wouldn't be one if we were restored from an 
older version checkpoint,
    +                   // in which case we can only simply assume that 
migration is not required
    +
    +                   if 
(restoredMetaInfo.getNamespaceSerializerConfigSnapshot() != null) {
    +                           MigrationStrategy<N> namespaceMigrationStrategy 
= newMetaInfo.getNamespaceSerializer()
    +                                   
.getMigrationStrategyFor(restoredMetaInfo.getNamespaceSerializerConfigSnapshot());
    +
    +                           TypeSerializer<N> finalOldNamespaceSerializer;
    +                           if 
(namespaceMigrationStrategy.requireMigration()) {
    +                                   requireMigration = true;
    +
    +                                   if 
(namespaceMigrationStrategy.getFallbackDeserializer() != null) {
    +                                           finalOldNamespaceSerializer = 
namespaceMigrationStrategy.getFallbackDeserializer();
    +                                   } else if 
(restoredMetaInfo.getNamespaceSerializer() != null
    +                                           && 
!(restoredMetaInfo.getNamespaceSerializer() instanceof 
MigrationNamespaceSerializerProxy)) {
    +                                           finalOldNamespaceSerializer = 
restoredMetaInfo.getNamespaceSerializer();
    +                                   } else {
    +                                           throw new RuntimeException(
    +                                                   "State migration 
required, but there is no available serializer capable of reading previous 
namespace.");
    +                                   }
    +                           }
    +                   }
    +
    +                   if (restoredMetaInfo.getStateSerializerConfigSnapshot() 
!= null) {
    +                           MigrationStrategy<S> stateMigrationStrategy = 
newMetaInfo.getStateSerializer()
    +                                   
.getMigrationStrategyFor(restoredMetaInfo.getStateSerializerConfigSnapshot());
    +
    +                           TypeSerializer<S> finalOldStateSerializer;
    +                           if (stateMigrationStrategy.requireMigration()) {
    +                                   requireMigration = true;
    +
    +                                   if 
(stateMigrationStrategy.getFallbackDeserializer() != null) {
    --- End diff --
    
    This whole `if` is interesting: basically you give the 
`FallbackDeserializer` priority over the actual old serializer that we could 
get via Java serialization.
    Originally, I thought the intended flow is: check compatibility between by 
confronting the user provided serializer with the stored the config. In case 
they we need to convert, first try to load the former serializer through Java 
serialization (because this is a safe bet that this class can read the old 
state if we succeed). If Java deserialization fails, we use the 
`FallbackSerializer` provided by the new serializer (this should also be a safe 
bet, except if the implementation is wrong, so overall slightly less save).
    
    Now here is there question: I think in the serializer you combine the 
compatibility check and potential creation of the `FallbackSerializer` in a 
single method, because both methods would partially do similar and duplicated  
work.
    The downside now is, given the indented flow, we only need to create and 
use a `FallbackSerializer` if we cannot load the old serializer through Java 
deserialization.
    
    I can see that this flow could also work, or we can still use the flow that 
prioritizes Java serialization and potentially creates an unused 
`FallbackSerializer`.
    
    What do you think and is there some point I did not consider that lead to 
this choice?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to