[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793189#comment-17793189
 ] 

Hangxiang Yu commented on FLINK-27681:
--------------------------------------

Sorry for the late reply.
{quote}However,  the job must fail in the future(When the corrupted block is 
read or compacted, or checkpoint failed number >= tolerable-failed-checkpoint). 
Then it will rollback to the older checkpoint.

The older checkpoint must be before we found the file is corrupted. Therefore, 
it is useless to run a job between the time it is discovered that the file is 
corrupted and the time it actually fails.

In brief, tolerable-failed-checkpoint can work, but the extra cost isn't 
necessary.

BTW, if failing job directly, this 
[comment|https://github.com/apache/flink/pull/23765#discussion_r1404136470] 
will be solved directly.
{quote}
Thanks for the detailed clarification.

I rethinked this, seems that failing the job is more reasonable than failing 
current checkpoint. I'm +1 if we could do that.
{quote}That's a non trivial overhead. Prolonging checkpoint for 10s in many 
cases (especially low throughput large state jobs) will be prohibitively 
expensive, delaying rescaling, e2e exactly once latency, etc. 1s+ for 1GB might 
also be less then ideal to enable by default.
{quote}
Cannot agree more.
{quote}Actually, aren't all of the disks basically have some form of CRC these 
days? I'm certain that's true about SSDs. Having said that, can you 
[~masteryhx] rephrase and elaborate on those 3 scenarios that you think we need 
to protect from? Especially where does the corruption happen?
{quote}
IIUC, Once we have IO operations about the SST, the file maybe corrupted even 
if it may happen very rarely.

RocksDB also shares some situations about using full file checksum[1] which is 
related to our usage:
 # local file which is prepared to upload: as you could see "verify the SST 
file when the whole file is read in DB (e.g., compaction)." in [1], and 
checksum in block level at runtime cannot guarantee the correctness of the SST 
which we could focus on at first.
 # Uploading and Downloaing: Firstly, disk IO and network IO may make the data 
error. Secondly, remote storage is not always reliable. So the checksum could 
be used when SST files are copied to other places (e.g., backup, move, or 
replicate) or stored remotely.

IIUC, checksum in SST level could guarantee the correctness of local file.

And checksum in filesystem level could guarantee the correctness of uploading 
and downloading.

[1] 
https://github.com/facebook/rocksdb/wiki/Full-File-Checksum-and-Checksum-Handoff

> Improve the availability of Flink when the RocksDB file is corrupted.
> ---------------------------------------------------------------------
>
>                 Key: FLINK-27681
>                 URL: https://issues.apache.org/jira/browse/FLINK-27681
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / State Backends
>            Reporter: Ming Li
>            Assignee: Yue Ma
>            Priority: Critical
>              Labels: pull-request-available
>         Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to