Github user MayerRoman commented on the issue:

    https://github.com/apache/flink/pull/3031
  
    I think that the changes that I propose eliminates the possibility of 
starting with checkpoints created before my code changes.
    
    Because now it saves ListState<Tuple2<KafkaTopicPartition, Tuple2<Long, 
Long>>> (partition + offset + watermark).
    And before it saved ListState<Tuple2<KafkaTopicPartition, Long>> (partition 
+ offset).
    
    (I mean checkpoints version later then 1.1.
    Recently Added backward compatibility with 1.1 snapshots is taken into 
account in my commit with it, I think everything is ok)
    
    
    Please advise me how to repair backward compatibility.
    
    I have some idea of how to implement it:
    
    1)  somehow verify returned from stateStore.getSerializableListState(..) 
object
    in initializeState method
    
https://github.com/apache/flink/pull/3031/files?diff=unified#diff-06bf4a7f73d98ef91309154654563475R321
    
    is it
    ListState<Tuple2<KafkaTopicPartition, Long>>
    or
    ListState<Tuple2<KafkaTopicPartition, Tuple<Long, Long>>>
    
    2)  Use for saving watermark separate state-object.
    
    Or it is necessary implement different way.
    
    I would be grateful for help.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to