[ 
https://issues.apache.org/jira/browse/FLINK-11187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738131#comment-16738131
 ] 

Steve Loughran commented on FLINK-11187:
----------------------------------------

bq. As far as I can tell, this is just a transient s3 error, so it appears to 
be pretty rare but does happen more often with increased load.

its believed to be a network error: upload of a single block fails, transfer 
manager wants to retry but the block buffering is such that it can't repost 
that block.

AFAIK. no easy way to test this other than a long-haul upload over many 
hours.Not unless you can insert an unreliable proxy or sneak in an httpclient 
which fails (possible if you have your own modified apache httpclient, run the 
SDK unshaded, etc...)

BTW: do make sure that final POST does retries too; some of the AWS SDKs have 
under-retried there, and you don't want a 4h upload to fail at the final step.

> StreamingFileSink with S3 backend transient socket timeout issues 
> ------------------------------------------------------------------
>
>                 Key: FLINK-11187
>                 URL: https://issues.apache.org/jira/browse/FLINK-11187
>             Project: Flink
>          Issue Type: Bug
>          Components: FileSystem, Streaming Connectors
>    Affects Versions: 1.7.0, 1.7.1
>            Reporter: Addison Higham
>            Assignee: Addison Higham
>            Priority: Major
>             Fix For: 1.7.2, 1.8.0
>
>
> When using the StreamingFileSink with S3A backend, occasionally, errors like 
> this will occur:
> {noformat}
> Caused by: 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>  Your socket connection to the server was not read from or written to within 
> the timeout period. Idle connections will be closed. (Service: Amazon S3; 
> Status Code: 400; Error Code: RequestTimeout; Request ID: xxx; S3 Extended 
> Request ID: xxx, S3 Extended Request ID: xxx
>        at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
>        at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
>        at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056){noformat}
> This causes a restart of flink job, which is often able to recover from, but 
> under heavy load, this can become very frequent.
>  
> Turning on debug logs you can find the following relevant stack trace:
> {noformat}
> 2018-12-17 05:55:46,546 DEBUG 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  - FYI: 
> failed to reset content inputstream before throwing up
> java.io.IOException: Resetting to invalid mark
>   at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
>   at 
> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74)
>   at 
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319)
>   at 
> org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748){noformat}
> This error occurs because of a transient failure in writing a multipart chunk 
> fails and the underlying InputStream cannot be reset. This ResetException 
> should be thrown to the client (as documented here: 
> [https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/best-practices.html])
>  but for some reason is not, instead, the client is retrying the request, but 
> now with a fully consumed InputStream. Because this InputStream is 
> empty/smaller we can't fill up the Content-Length that the multipart upload 
> is expecting, so the socket hangs to eventually be timed out.
>  
> This failure happens roughly ~20 times before the AWS client retry logic 
> finally fails the request and the socket time out exception is thrown.
>  
> As mentioned in the best practice AWS doc, the best fix for this is to use a 
> File or FileInputStream object or to use the setReadLimit. I tried to use a 
> global SDK property (com.amazonaws.sdk.s3.defaultStreamBufferSize) to set 
> this value, but that did not fix the problem, which I believe is because the 
> InputStream is not mark-able and the AWS client doesn't wrap the stream.
>  
> What is confirmed to work is the following patch: 
> [https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6]
>  
> That is obviously not ideal, but it may suffice to just make that 
> configurable.
>  
> The other option is to instead expose the S3A WriteHelper option to pass a 
> file to the S3AccessHelper and change the other relevant classes 
> (RefCountedFSOutputStream) to expose the File object and directly hand that 
> to the S3A WriteHelper



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to