Aljoscha, > Instead the sink would have to check for each set of committables seperately if they had already been committed. Do you think this is feasible?
Yes, that is how it works in our internal implementation [1]. We don't use checkpointId. We generate a manifest file (GlobalCommT) to bundle all the data files that the committer received in one checkpoint cycle. Then we generate a unique manifest id for by hashing the location of the manifest file. The manifest ids are stored in Iceberg snapshot metadata. Upon restore, we check each of the restored manifest files against Iceberg table snapshot metadata to determine if we should discard or keep the restored manifest files. If a commit has multiple manifest files (e.g. accumulated from previous failed commits), we store the comma-separated manifest ids in Iceberg snapshot metadata. > During normal operation this set would be very small, it would usually only be the committables for the last checkpoint. Only when there is an outage would multiple sets of committables pile up. You are absolutely right here. Even if there are multiple sets of committables, it is usually the last a few or dozen of snapshots we need to check. Even with our current inefficient implementation of traversing all table snapshots (in the scale of thousands) from oldest to latest, it only took avg 60 ms and max 800 ms. so it is really not a concern for Iceberg. > CommitStatus commitGlobally(List<Committable>, Nonce) Just to clarify on the terminology here. Assuming here the Committable meant the `GlobalCommT` (like ManifestFile in Iceberg) in previous discussions, right? `CommT` means the Iceberg DataFile from writer to committer. This can work assuming we *don't have concurrent executions of commitGlobally* even with concurrent checkpoints. Here is the scenario regarding failure recovery I want to avoid. Assuming checkpoints 1, 2, 3 all completed. Each checkpoint generates a manifest file, manifest-1, 2, 3. timeline -------------------------------------------------------------------------> now commitGlobally(manifest-1, nonce-1) started commitGlobally(manifest-2, nonce-2) started commitGlobally(manifest-2, nonce-2) failed commitGlobally(manifest-2 and manifest-3, nonce-3) started commitGlobally(manifest-1, nonce-1) failed commitGlobally(manifest-2 and manifest-3, nonce-3) succeeded Now the job failed and was restored from checkpoint 3, which contains manifest file 1,2,3. We found nonce-3 was committed when checking Iceberg table snapshot metadata. But in this case we won't be able to correctly determine which manifest files were committed or not. If it is possible to have concurrent executions of commitGlobally, the alternative is to generate the unique id/nonce per GlobalCommT. Then we can check each individual GlobalCommT (ManifestFile) with Iceberg snapshot metadata. Thanks, Steven [1] https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java#L569 On Fri, Sep 18, 2020 at 2:44 AM Aljoscha Krettek <aljos...@apache.org> wrote: > Steven, > > we were also wondering if it is a strict requirement that "later" > updates to Iceberg subsume earlier updates. In the current version, you > only check whether checkpoint X made it to Iceberg and then discard all > committable state from Flink state for checkpoints smaller X. > > If we go with a (somewhat random) nonce, this would not work. Instead > the sink would have to check for each set of committables seperately if > they had already been committed. Do you think this is feasible? During > normal operation this set would be very small, it would usually only be > the committables for the last checkpoint. Only when there is an outage > would multiple sets of committables pile up. > > We were thinking to extend the GlobalCommitter interface to allow it to > report success or failure and then let the framework retry. I think this > is something that you would need for the Iceberg case. The signature > could be like this: > > CommitStatus commitGlobally(List<Committable>, Nonce) > > where CommitStatus could be an enum of SUCCESS, TERMINAL_FAILURE, and > RETRY. > > Best, > Aljoscha >