[ 
https://issues.apache.org/jira/browse/FLINK-27681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17789903#comment-17789903
 ] 

Hangxiang Yu edited comment on FLINK-27681 at 11/27/23 4:33 AM:
----------------------------------------------------------------

[~pnowojski] [~fanrui]  Thanks for joining in the discussion.

Thanks [~mayuehappy] for expalining the case which we also saw in our 
production environment.

Let me also try to share my thoughts about your questions.
{quote}I'm worried that flink add check can't completely solve the problem of 
file corruption. Is it possible that file corruption occurs after flink check 
but before uploading the file to hdfs?
{quote}
I think the concern is right.

Actually, file corruption may occurs in all stages:
 # File generation at runtime (RocksDB memtable flush or Compaction)
 # Uploading when checkpoint (local file -> memory buffer -> network transfer 
-> DFS)
 # Downloading when recovery(reversed path with 2)

 

For the first situation: 
 * File corruption will not affect the read path because the checksum will be 
checked when reading rocksdb block. The job will failover when read the 
corrupted one.
 * So the core problem is that a corruped file which is not read at runtime 
will be uploaded to remote DFS when checkpoint. It will affect the normal 
processing once failover which will have severe consequence especially for high 
priority job.
 * That's why we'd like to focus on not uploading the corruped files (Also for 
just fail the job simply to make job restore from the last complete checkpoint).

For the second and third situations:
 * The ideal way is that we should unify the checksum machnism of local db and 
remote DFS.
 * Many FileSystems supports to pass the file checksum and verify it in their 
remote server. We could use this to verify the checksum end-to-end.
 * But this may introduce a new API in some public classes like FileSystem 
which is a bigger topic.
 * As we also saw many cases like [~mayuehappy] mentioned. So I think maybe we 
could resolve this case at first. I'd also like to see we have the ideal way to 
go if it worth doing.


was (Author: masteryhx):
[~pnowojski] [~fanrui]  Thanks for joining in the discussion.

Thanks [~mayuehappy] for expalining the case which we also saw in our 
production environment.

Let me also try to share my thoughts about your questions.
{quote}I'm worried that flink add check can't completely solve the problem of 
file corruption. Is it possible that file corruption occurs after flink check 
but before uploading the file to hdfs?
{quote}
I think the concern is right.

Actually, file corruption may occurs in all stages:
 # File generation at runtime (RocksDB memtable flush or Compaction)
 # Uploading when checkpoint (local file -> memory buffer -> network transfer 
-> DFS)
 # Downloading when recovery(reversed path with 2)

 

For the first situation: 
 * File corruption will not affect the read path because the checksum will be 
checked when reading rocksdb block. The job will failover when read the 
corrupted one.
 * So the core problem is that a corruption file which is not read at runtime 
will be uploaded to remote DFS when checkpoint. It will affect the normal 
processing once failover which will have severe consequence especially for high 
priority job.
 * That's why we'd like to focus on not uploading the corruped files.

For the second and third situations:
 * The ideal way is that we should unify the checksum machnism of local db and 
remote DFS.
 * Many FileSystems supports to pass the file checksum and verify it in their 
remote server. We could use this to verify the checksum end-to-end.
 * But this may introduce a new API in some public classes like FileSystem 
which is a bigger topic.
 * As we also saw many cases like [~mayuehappy] mentioned. So I think maybe we 
could resolve this case at first. I'd also like to see we have the ideal way to 
go if it worth doing.

> Improve the availability of Flink when the RocksDB file is corrupted.
> ---------------------------------------------------------------------
>
>                 Key: FLINK-27681
>                 URL: https://issues.apache.org/jira/browse/FLINK-27681
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / State Backends
>            Reporter: Ming Li
>            Assignee: Yue Ma
>            Priority: Critical
>              Labels: pull-request-available
>         Attachments: image-2023-08-23-15-06-16-717.png
>
>
> We have encountered several times when the RocksDB checksum does not match or 
> the block verification fails when the job is restored. The reason for this 
> situation is generally that there are some problems with the machine where 
> the task is located, which causes the files uploaded to HDFS to be incorrect, 
> but it has been a long time (a dozen minutes to half an hour) when we found 
> this problem. I'm not sure if anyone else has had a similar problem.
> Since this file is referenced by incremental checkpoints for a long time, 
> when the maximum number of checkpoints reserved is exceeded, we can only use 
> this file until it is no longer referenced. When the job failed, it cannot be 
> recovered.
> Therefore we consider:
> 1. Can RocksDB periodically check whether all files are correct and find the 
> problem in time?
> 2. Can Flink automatically roll back to the previous checkpoint when there is 
> a problem with the checkpoint data, because even with manual intervention, it 
> just tries to recover from the existing checkpoint or discard the entire 
> state.
> 3. Can we increase the maximum number of references to a file based on the 
> maximum number of checkpoints reserved? When the number of references exceeds 
> the maximum number of checkpoints -1, the Task side is required to upload a 
> new file for this reference. Not sure if this way will ensure that the new 
> file we upload will be correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to