[ 
https://issues.apache.org/jira/browse/FLINK-38225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-38225:
---------------------------------
    Fix Version/s:     (was: 1.20.4)
                       (was: 2.2.1)

> GSBlobStorageImpl needs to add precondition check to make 503 retryable
> -----------------------------------------------------------------------
>
>                 Key: FLINK-38225
>                 URL: https://issues.apache.org/jira/browse/FLINK-38225
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>    Affects Versions: 1.18.0, 1.19.0, 1.20.0, 2.0.1
>            Reporter: Xi Zhang
>            Assignee: Oleksandr Nitavskyi
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.3.0
>
>
> We discovered that storage.writer and compose are not retrying 503 errors. It 
> happens quite frequently and causes checkpoint failures. 
> Here is our investigation findings:
>  * We noticed frequent checkpoint failures due to GCS 503, tuning retry 
> settings didn't seem to take an effect.
>  * We upgraded storage client to 2.52.1 and built a custom image. The new 
> version revealed more detailed error: 
>  ** Suppressed: 
> com.google.cloud.storage.RetryContext$RetryBudgetExhaustedComment: 
> Unretryable error (attempts: 1, maxAttempts: 12, elapsed: PT0.683S, 
> nextBackoff: PT1.173526768S, timeout: PT2M30S)
> Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 
> 503 Service Unavailable
>  * Upon checking with Google, we need to set preconditions in write and 
> compose, otherwise 503 error will be deemed not retryable.
>  * The fix is as below
> {code:java}
> Storage.BlobTargetOption precondition;
> if (storage.get(targetBlobIdentifier.bucketName, 
> targetBlobIdentifier.objectName) == null) {
>     // For a target object that does not yet exist, set the DoesNotExist 
> precondition.
>     // This will cause the request to fail if the object is created before 
> the request runs.
>     precondition = Storage.BlobTargetOption.doesNotExist();
> } else {
>     // If the destination already exists in your bucket, instead set a 
> generation-match
>     // precondition. This will cause the request to fail if the existing 
> object's generation
>     // changes before the request runs.
>     precondition =
>             Storage.BlobTargetOption.generationMatch(
>                     storage.get(
>                                     targetBlobIdentifier.bucketName,
>                                     targetBlobIdentifier.objectName)
>                             .getGeneration());
> }
> Storage.ComposeRequest request = 
> builder.setTargetOptions(precondition).build();
> storage.compose(request);
>  {code}
> It needs to be added in compose: 
> [https://github.com/apache/flink/blob/master/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorageImpl.java#L157]
>  * Similiar fix applies to write as well: 
> [https://github.com/apache/flink/blob/master/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorageImpl.java#L64]
>  * We don't see checkpoint failures after the fix. 
>  * We'd like to apply the fix to open source code, since we don't want our 
> image to diverge too much with the open source version. I can help contribute 
> if needed. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to