Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5337 @pluppens My only concern is that scanning the whole list of shards can be very limited to AWS Kinesis's API invoke rate limitations. Also, we would then only be cleaning up the state on restore, meaning we would kind of be encouraging (in a bad way) Kinesis users to snapshot and restore every once in a while. I think the best solution for that is probably to use a threshold constant as Stephan suggested, but we will need to investigate whether the Kinesis API supports enough information to implement this. I'll open a separate JIRA ticket forr this, so we can properly discuss the issues of pruning closed shard states there.
---