Aljoscha,

> Instead the sink would have to check for each set of committables
seperately if they had already been committed. Do you think this is
feasible?

Yes, that is how it works in our internal implementation [1]. We don't use
checkpointId. We generate a manifest file (GlobalCommT) to bundle all the
data files that the committer received in one checkpoint cycle. Then we
generate a unique manifest id for by hashing the location of the manifest
file. The manifest ids are stored in Iceberg snapshot metadata. Upon
restore, we check each of the restored manifest files against Iceberg table
snapshot metadata to determine if we should discard or keep the restored
manifest files. If a commit has multiple manifest files (e.g. accumulated
from previous failed commits), we store the comma-separated manifest ids in
Iceberg snapshot metadata.

> During normal operation this set would be very small, it would usually
only be the committables for the last checkpoint. Only when there is an
outage would multiple sets of committables pile up.

You are absolutely right here. Even if there are multiple sets of
committables, it is usually the last a few or dozen of snapshots we need to
check. Even with our current inefficient implementation of traversing all
table snapshots (in the scale of thousands) from oldest to latest, it only
took avg 60 ms and max 800 ms. so it is really not a concern for Iceberg.

> CommitStatus commitGlobally(List<Committable>, Nonce)

Just to clarify on the terminology here. Assuming here the Committable
meant the `GlobalCommT` (like ManifestFile in Iceberg) in
previous discussions, right? `CommT` means the Iceberg DataFile from writer
to committer.

This can work assuming we *don't have concurrent executions
of commitGlobally* even with concurrent checkpoints. Here is the scenario
regarding failure recovery I want to avoid.

Assuming checkpoints 1, 2, 3 all completed. Each checkpoint generates a
manifest file, manifest-1, 2, 3.
timeline
------------------------------------------------------------------------->
now
commitGlobally(manifest-1, nonce-1) started
         commitGlobally(manifest-2, nonce-2) started
                    commitGlobally(manifest-2, nonce-2) failed
                            commitGlobally(manifest-2 and manifest-3,
nonce-3) started
                                    commitGlobally(manifest-1, nonce-1)
failed
                                            commitGlobally(manifest-2 and
manifest-3, nonce-3) succeeded

Now the job failed and was restored from checkpoint 3, which contains
manifest file 1,2,3. We found nonce-3 was committed when checking Iceberg
table snapshot metadata. But in this case we won't be able to correctly
determine which manifest files were committed or not.

If it is possible to have concurrent executions of  commitGlobally, the
alternative is to generate the unique id/nonce per GlobalCommT. Then we can
check each individual GlobalCommT (ManifestFile) with Iceberg snapshot
metadata.

Thanks,
Steven

[1]
https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java#L569

On Fri, Sep 18, 2020 at 2:44 AM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Steven,
>
> we were also wondering if it is a strict requirement that "later"
> updates to Iceberg subsume earlier updates. In the current version, you
> only check whether checkpoint X made it to Iceberg and then discard all
> committable state from Flink state for checkpoints smaller X.
>
> If we go with a (somewhat random) nonce, this would not work. Instead
> the sink would have to check for each set of committables seperately if
> they had already been committed. Do you think this is feasible? During
> normal operation this set would be very small, it would usually only be
> the committables for the last checkpoint. Only when there is an outage
> would multiple sets of committables pile up.
>
> We were thinking to extend the GlobalCommitter interface to allow it to
> report success or failure and then let the framework retry. I think this
> is something that you would need for the Iceberg case. The signature
> could be like this:
>
> CommitStatus commitGlobally(List<Committable>, Nonce)
>
> where CommitStatus could be an enum of SUCCESS, TERMINAL_FAILURE, and
> RETRY.
>
> Best,
> Aljoscha
>

Reply via email to