Hi Padarn,

This is the jira issue:  https://issues.apache.org/jira/browse/FLINK-11187
and the fix, as you can see, was first included in version 1.7.2.

Cheers,
Kostas

On Mon, Feb 18, 2019 at 3:49 AM Padarn Wilson <padarn.wil...@grab.com>
wrote:

> Hi Addison, Kostas, Steffan,
>
> I am also encountering this exact issue. I cannot find a JIRA ticket on
> this, is there some planned work on implementing a fix?
>
> @Addison - Did you manage to find a fix that you could apply without
> modifying the Flink codebase? If possible it would be better not patch the
> code base and compile a custom image.
>
> Thanks,
> Padarn
>
> On Tue, Dec 18, 2018 at 5:37 AM Addison Higham <addis...@gmail.com> wrote:
>
>> Oh this is timely!
>>
>> I hope I can save you some pain Kostas! (cc-ing to flink dev to get
>> feedback there for what I believe to be a confirmed bug)
>>
>>
>> I was just about to open up a flink issue for this after digging (really)
>> deep and figuring out the issue over the weekend.
>>
>> The problem arises due the flink hands input streams to the
>> S3AccessHelper. If you turn on debug logs for s3, you will eventually see
>> this stack trace:
>>
>> 2018-12-17 05:55:46,546 DEBUG
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  -
>> FYI: failed to reset content inputstream before throwing up
>> java.io.IOException: Resetting to invalid mark
>>   at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
>>   at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
>>   at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
>>   at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>>   at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>>   at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
>>   at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
>>   at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
>>   at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
>>   at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
>>   at
>> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74)
>>   at
>> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319)
>>   at
>> org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
>>   at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>   at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>   at java.lang.Thread.run(Thread.java:748)
>>
>> From this, you can see that for (some reason) AWS fails to write a
>> multi-part chunk and then tries to reset the input stream in order to retry
>> but fails (because the InputStream is not mark-able)
>>
>> That exception is swallowed (it seems like it should be raised up to
>> client, but isn't for an unknown reason). The s3-client then tries to
>> repeat the request using it's built in retry logic, however, because the
>> InputStream is consumed
>> and has no more bytes to write, we never fill up the expected
>> content-length that the s3 put request is expecting. Eventually, after it
>> hits the max number of retries, it fails and you get the error above.
>>
>> I just started running a fix for this (which is a hack not the real
>> solution) here:
>> https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6
>>
>> This whole thing is documented here:
>> https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/best-practices.html
>>
>> However, I found that just using the documented property didn't appear to
>> work and I had to wrap the InputStream in the BufferedInputStream for it to
>> work.
>>
>> I think the real fix is either to:
>>
>> 1. Use the BufferedInputStream but make it configurable
>> 2. Refactor S3AccessHelper to have another signature that takes a File
>> object and change the RefCountedFSOutputStream to also be able to give a
>> reference the the underlying file.
>>
>> I can pretty easily do this work, but would be curious the direction that
>> the maintainers would prefer.
>>
>> Thanks,
>>
>> Addison!
>>
>>
>>
>>
>>
>>
>> On Fri, Dec 14, 2018 at 8:43 AM Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> Hi Steffen,
>>>
>>> Thanks for reporting this.
>>>
>>> Internally Flink does not keep any open connections to S3.  It only
>>> keeps buffers data internally up
>>> till the point they reach a min-size limit (by default 5MB) and then
>>> uploads them as a part of
>>> an MPU on one go. Given this, I will have to dig a bit dipper to see why
>>> a connection would timeout.
>>>
>>> If you are willing to dig into the code, all interactions with S3 pass
>>> through the S3AccessHelper
>>> class and its implementation, the HadoopS3AccessHelper. For the
>>> buffering and uploading logic,
>>> you could have a look at the S3RecoverableWriter and the
>>> S3RecoverableFsDataOutputStream.
>>>
>>> I will keep looking into it. In the meantime, if you find anything let
>>> us know.
>>>
>>> Cheers,
>>> Kostas
>>>
>>>
> *Grab is hiring. Learn more at **https://grab.careers
> <https://grab.careers/>*
>
> By communicating with Grab Inc and/or its subsidiaries, associate
> companies and jointly controlled entities (“Grab Group”), you are deemed to
> have consented to processing of your personal data as set out in the
> Privacy Notice which can be viewed at https://grab.com/privacy/
>
> This email contains confidential information and is only for the intended
> recipient(s). If you are not the intended recipient(s), please do not
> disseminate, distribute or copy this email and notify Grab Group
> immediately if you have received this by mistake and delete this email from
> your system. Email transmission cannot be guaranteed to be secure or
> error-free as any information therein could be intercepted, corrupted,
> lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do
> not accept liability for any errors or omissions in the contents of this
> email arises as a result of email transmission. All intellectual property
> rights in this email and attachments therein shall remain vested in Grab
> Group, unless otherwise provided by law.
>


-- 

Kostas Kloudas | Software Engineer


<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Reply via email to