Hi Addison, Kostas, Steffan,

I am also encountering this exact issue. I cannot find a JIRA ticket on
this, is there some planned work on implementing a fix?

@Addison - Did you manage to find a fix that you could apply without
modifying the Flink codebase? If possible it would be better not patch the
code base and compile a custom image.

Thanks,
Padarn

On Tue, Dec 18, 2018 at 5:37 AM Addison Higham <addis...@gmail.com> wrote:

> Oh this is timely!
>
> I hope I can save you some pain Kostas! (cc-ing to flink dev to get
> feedback there for what I believe to be a confirmed bug)
>
>
> I was just about to open up a flink issue for this after digging (really)
> deep and figuring out the issue over the weekend.
>
> The problem arises due the flink hands input streams to the
> S3AccessHelper. If you turn on debug logs for s3, you will eventually see
> this stack trace:
>
> 2018-12-17 05:55:46,546 DEBUG
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  -
> FYI: failed to reset content inputstream before throwing up
> java.io.IOException: Resetting to invalid mark
>   at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
>   at
> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74)
>   at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319)
>   at
> org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>
> From this, you can see that for (some reason) AWS fails to write a
> multi-part chunk and then tries to reset the input stream in order to retry
> but fails (because the InputStream is not mark-able)
>
> That exception is swallowed (it seems like it should be raised up to
> client, but isn't for an unknown reason). The s3-client then tries to
> repeat the request using it's built in retry logic, however, because the
> InputStream is consumed
> and has no more bytes to write, we never fill up the expected
> content-length that the s3 put request is expecting. Eventually, after it
> hits the max number of retries, it fails and you get the error above.
>
> I just started running a fix for this (which is a hack not the real
> solution) here:
> https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6
>
> This whole thing is documented here:
> https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/best-practices.html
>
> However, I found that just using the documented property didn't appear to
> work and I had to wrap the InputStream in the BufferedInputStream for it to
> work.
>
> I think the real fix is either to:
>
> 1. Use the BufferedInputStream but make it configurable
> 2. Refactor S3AccessHelper to have another signature that takes a File
> object and change the RefCountedFSOutputStream to also be able to give a
> reference the the underlying file.
>
> I can pretty easily do this work, but would be curious the direction that
> the maintainers would prefer.
>
> Thanks,
>
> Addison!
>
>
>
>
>
>
> On Fri, Dec 14, 2018 at 8:43 AM Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> Hi Steffen,
>>
>> Thanks for reporting this.
>>
>> Internally Flink does not keep any open connections to S3.  It only keeps
>> buffers data internally up
>> till the point they reach a min-size limit (by default 5MB) and then
>> uploads them as a part of
>> an MPU on one go. Given this, I will have to dig a bit dipper to see why
>> a connection would timeout.
>>
>> If you are willing to dig into the code, all interactions with S3 pass
>> through the S3AccessHelper
>> class and its implementation, the HadoopS3AccessHelper. For the buffering
>> and uploading logic,
>> you could have a look at the S3RecoverableWriter and the
>> S3RecoverableFsDataOutputStream.
>>
>> I will keep looking into it. In the meantime, if you find anything let us
>> know.
>>
>> Cheers,
>> Kostas
>>
>>

-- 
_Grab is hiring. Learn more at *https://grab.careers 
<https://grab.careers/>*_


By communicating with Grab Inc and/or its 
subsidiaries, associate companies and jointly controlled entities (“Grab 
Group”), you are deemed to have consented to processing of your personal 
data as set out in the Privacy Notice which can be viewed at 
https://grab.com/privacy/ <https://grab.com/privacy/>


This email contains 
confidential information and is only for the intended recipient(s). If you 
are not the intended recipient(s), please do not disseminate, distribute or 
copy this email and notify Grab Group immediately if you have received this 
by mistake and delete this email from your system. Email transmission 
cannot be guaranteed to be secure or error-free as any information therein 
could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or 
contain viruses. Grab Group do not accept liability for any errors or 
omissions in the contents of this email arises as a result of email 
transmission. All intellectual property rights in this email and 
attachments therein shall remain vested in Grab Group, unless otherwise 
provided by law.

Reply via email to