[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793626#comment-17793626
 ] 

Piotr Nowojski commented on FLINK-27681:
----------------------------------------

{quote}

IIUC, checksum in SST level could guarantee the correctness of local file.

{quote}

Yes, I think we don't need any extra protection for corruption of the local 
files. From the document you shared RocksDB will throw some error every time we 
try to read a corrupted block

{quote}

And checksum in filesystem level could guarantee the correctness of uploading 
and downloading.

{quote}

Now I'm not so sure about it. Now that I think about it more, checksums on the 
filesystem level or the HDD/SSD level wouldn't protect us from a corruption 
happening after reading the bytes from local file, but before those bytes are 
acknowledged by the DFS/object store. 

A neat way would be to calculate the checksum locally, when writing the SST 
file to the local file system ("Full File Checksum Design" from the document 
[~masteryhx]  shared?), without any significant overhead (bytes that we want to 
verify would be after all already in the RAM). Next if we could cheaply verify 
that the uploaded file to the DFS still has the same checksum as computed 
during creation of that file, we could make sure that no matter what, we always 
have valid files in the DFS, that we can fallback to everytime RocksDB detects 
a data corruption when accessing and SST file locally.

It looks like this might be do-able in one [1] of the two [2] ways. At least 
for the S3. 

[1] 
[https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#checking-object-integrity-md5]

I don't know if AWS's check against the {{Content-MD5}} field is for free. As 
far as I understand it, it could be implement to be almost for free, but the 
docs do not mention that.

[2] 
https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#trailing-checksums
 

Here the docs are saying that this is for free, but it looks like this is 
calculating a new checksum during the upload process. So the question would be, 
could we retrieve that checksum and compare it against our locally computed one?

 

[~mayuehappy] , if we decide to go this direction, then the change to fail a 
job after checksum mismatching during the async phase could be implemented 
easily here: 
{{{}org.apache.flink.runtime.checkpoint.CheckpointFailureManager{}}}. I don't 
think we need an extra ticket for that, separate commit in the same PR will 
suffice. 

> Improve the availability of Flink when the RocksDB file is corrupted.
> ---------------------------------------------------------------------
>
>                 Key: FLINK-27681
>                 URL: https://issues.apache.org/jira/browse/FLINK-27681
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / State Backends
>            Reporter: Ming Li
>            Assignee: Yue Ma
>            Priority: Critical
>              Labels: pull-request-available
>         Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to