Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5337 re @StephanEwen yes, currently, state is still kept indefinitely for closed shards. A special `SHARD_END` marker sequence number is stored as the sequence number for closed shards, so that the consumer does not attempt to read them on restore. A threshold timestamp could work if AWS API provides shard creation times. A threshold sequence numbers would also work if sequence numbers are always monotonically increasing across shards. Will need some investigation to see if this is feasible. Either way, I think this is an improvement out-of-scope for the issue at hand, and would also require some migration path from the old state (where this constant threshold state doesn't exist). For 1.4.1 bugfix, I think we should continue with the current approach. It might make sense though, to fix this via a constant threshold state in 1.5.0.
---