Oh this is timely!

I hope I can save you some pain Kostas! (cc-ing to flink dev to get
feedback there for what I believe to be a confirmed bug)


I was just about to open up a flink issue for this after digging (really)
deep and figuring out the issue over the weekend.

The problem arises due the flink hands input streams to the S3AccessHelper.
If you turn on debug logs for s3, you will eventually see this stack trace:

2018-12-17 05:55:46,546 DEBUG
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  -
FYI: failed to reset content inputstream before throwing up
java.io.IOException: Resetting to invalid mark
  at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
  at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
  at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
  at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
  at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
  at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
  at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
  at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
  at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
  at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
  at
org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74)
  at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319)
  at
org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
  at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

>From this, you can see that for (some reason) AWS fails to write a
multi-part chunk and then tries to reset the input stream in order to retry
but fails (because the InputStream is not mark-able)

That exception is swallowed (it seems like it should be raised up to
client, but isn't for an unknown reason). The s3-client then tries to
repeat the request using it's built in retry logic, however, because the
InputStream is consumed
and has no more bytes to write, we never fill up the expected
content-length that the s3 put request is expecting. Eventually, after it
hits the max number of retries, it fails and you get the error above.

I just started running a fix for this (which is a hack not the real
solution) here:
https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6

This whole thing is documented here:
https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/best-practices.html

However, I found that just using the documented property didn't appear to
work and I had to wrap the InputStream in the BufferedInputStream for it to
work.

I think the real fix is either to:

1. Use the BufferedInputStream but make it configurable
2. Refactor S3AccessHelper to have another signature that takes a File
object and change the RefCountedFSOutputStream to also be able to give a
reference the the underlying file.

I can pretty easily do this work, but would be curious the direction that
the maintainers would prefer.

Thanks,

Addison!






On Fri, Dec 14, 2018 at 8:43 AM Kostas Kloudas <k.klou...@data-artisans.com>
wrote:

> Hi Steffen,
>
> Thanks for reporting this.
>
> Internally Flink does not keep any open connections to S3.  It only keeps
> buffers data internally up
> till the point they reach a min-size limit (by default 5MB) and then
> uploads them as a part of
> an MPU on one go. Given this, I will have to dig a bit dipper to see why a
> connection would timeout.
>
> If you are willing to dig into the code, all interactions with S3 pass
> through the S3AccessHelper
> class and its implementation, the HadoopS3AccessHelper. For the buffering
> and uploading logic,
> you could have a look at the S3RecoverableWriter and the
> S3RecoverableFsDataOutputStream.
>
> I will keep looking into it. In the meantime, if you find anything let us
> know.
>
> Cheers,
> Kostas
>
>

Reply via email to