[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17794622#comment-17794622
 ] 

Piotr Nowojski commented on FLINK-27681:
----------------------------------------

{quote}

So I understand that we can do this manual check in this ticket first. If the 
file is detected to be corrupted, we can fail the job. Is this a good choice? 

{quote}

I would prefer to avoid the approach that was proposed here in the ticket:

[i] before uploading the file, scan it completely block by block for the data 
consistency using RocksDB's mechanisms. 

The problems:
 * Performance overhead
 * Works only for RocksDB
 * Doesn't protect us fully from the data corruption. Corruption can happen 
just after we checked it locally, but before we uploaded to the DFS

So rephrasing what I was proposing in my messages above:

[ii] 
 # Calculate some checksum *[A]* ON THE FLY, at the same time that the state 
file is being written/created. For RocksDB that would require hooking up with 
the RocksDB itself. It would be easier for the HashMap state backend. But it 
would have zero additional IO cost, and some minor CPU cost (negligible 
compared to the IO access)
 # Remember the checksum *[A]*  until:
 # Depending what the DFS supports, either:
 ** preferably, verify against the checksum *[A]* ON THE FLY, when file is 
being uploaded to the DFS. In principle, if implemented by the DFS properly, 
this should be again basically for free, without and additional IO cost. S3 
might actually support that via  [1] or [2].
 ** after uploading, use DFS api to remotely calculate checksum of the uploaded 
file, and compare it against the checksum *[A].* S3 does support it [3], 
quoting:

{quote}

After uploading objects, you can get the checksum value and compare it to a 
precalculated or previously stored checksum value calculated using the same 
algorithm.

{quote}

 

Note, we would have to ensure, that checksum  [A] is always calculated the same 
way, both in the statebackend (RocksDB) and DFS (S3). I have no idea if RocksDB 
supports something like that, but if not:
 * it should be a welcome contribution by RocksDB maintainers
 * implementing a hook on our side in our FRocksDB fork doesn't sound too 
complicated. I would hope it would only require wrapping some {{OutByteStream}} 
class and that's it.

 

[1] 
[https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#checking-object-integrity-md5]

[2] 
[https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#trailing-checksums]
 

[3] 
[https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#using-additional-checksums]

 

 

> Improve the availability of Flink when the RocksDB file is corrupted.
> ---------------------------------------------------------------------
>
>                 Key: FLINK-27681
>                 URL: https://issues.apache.org/jira/browse/FLINK-27681
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / State Backends
>            Reporter: Ming Li
>            Assignee: Yue Ma
>            Priority: Critical
>              Labels: pull-request-available
>         Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to