Addison Higham created FLINK-11187:
--------------------------------------

             Summary: StreamingFileSink with S3 backend transient socket 
timeout issues 
                 Key: FLINK-11187
                 URL: https://issues.apache.org/jira/browse/FLINK-11187
             Project: Flink
          Issue Type: Bug
          Components: FileSystem, Streaming Connectors
    Affects Versions: 1.7.0, 1.7.1
            Reporter: Addison Higham
            Assignee: Addison Higham
             Fix For: 1.7.2


When using the StreamingFileSink with S3A backend, occasionally, errors like 
this will occur:
{noformat}
Caused by: 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
 Your socket connection to the server was not read from or written to within 
the timeout period. Idle connections will be closed. (Service: Amazon S3; 
Status Code: 400; Error Code: RequestTimeout; Request ID: xxx; S3 Extended 
Request ID: xxx, S3 Extended Request ID: xxx
       at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
       at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
       at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056){noformat}
This causes a restart of flink job, which is often able to recover from, but 
under heavy load, this can become very frequent.

 

Turning on debug logs you can find the following relevant stack trace:
{noformat}
2018-12-17 05:55:46,546 DEBUG 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  - FYI: 
failed to reset content inputstream before throwing up
java.io.IOException: Resetting to invalid mark
  at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
  at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
  at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
  at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
  at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
  at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
  at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
  at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
  at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
  at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
  at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
  at 
org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74)
  at 
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319)
  at 
org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748){noformat}
This error occurs because of a transient failure in writing a multipart chunk 
fails and the underlying InputStream cannot be reset. This ResetException 
should be thrown to the client (as documented here: 
[https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/best-practices.html])
 but for some reason is not, instead, the client is retrying the request, but 
now with a fully consumed InputStream. Because this InputStream is 
empty/smaller we can't fill up the Content-Length that the multipart upload is 
expecting, so the socket hangs to eventually be timed out.

 

This failure happens roughly ~20 times before the AWS client retry logic 
finally fails the request and the socket time out exception is thrown.

 

As mentioned in the best practice AWS doc, the best fix for this is to use a 
File or FileInputStream object or to use the setReadLimit. I tried to use a 
global SDK property (com.amazonaws.sdk.s3.defaultStreamBufferSize) to set this 
value, but that did not fix the problem, which I believe is because the 
InputStream is not mark-able and the AWS client doesn't wrap the stream.

 

What is confirmed to work is the following patch: 
[https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6]

 

That is obviously not ideal, but it may suffice to just make that configurable.

 

The other option is to instead expose the S3A WriteHelper option to pass a file 
to the S3AccessHelper and change the other relevant classes 
(RefCountedFSOutputStream) to expose the File object and directly hand that to 
the S3A WriteHelper



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to