Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1305#issuecomment-153339094 @StephanEwen Thanks for the comments. You are right the main idea is exactly as you described. The reason why exactly-once is violated in some corner cases because it can happen that the pre-commit phase of the previous checkpoint is still failing during recovery. If we assume that the previous job is completely killed of, no writing to the database whatsoever after that happens, then we can properly clean up during recovery. This unfortunately does not seem to hold if you set the retry wait time to very low (like 0 ms in the snapshot). What this means is that the failed job is still writing the failed snapshot to the database after you recovered and cleaned up. As for the compaction, I came up with something very similar for compaction but here is the funny thing and my problem. The query you wrote will run properly on Derby but is invalid on MySql (you cannot create a subquery for the same table as you are modifying). In mysql you need to create an inner join, but that will not work in Derby :P In any case I have made a prototype of this on: https://github.com/gyfora/flink/tree/compaction The user can define the frequency of compaction (compact every so many checkpoints). And it also makes sure that compaction and cleanup is only executed on 1 subtask to avoid double work.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---