Hi Padarn, This is the jira issue: https://issues.apache.org/jira/browse/FLINK-11187 and the fix, as you can see, was first included in version 1.7.2.
Cheers, Kostas On Mon, Feb 18, 2019 at 3:49 AM Padarn Wilson <padarn.wil...@grab.com> wrote: > Hi Addison, Kostas, Steffan, > > I am also encountering this exact issue. I cannot find a JIRA ticket on > this, is there some planned work on implementing a fix? > > @Addison - Did you manage to find a fix that you could apply without > modifying the Flink codebase? If possible it would be better not patch the > code base and compile a custom image. > > Thanks, > Padarn > > On Tue, Dec 18, 2018 at 5:37 AM Addison Higham <addis...@gmail.com> wrote: > >> Oh this is timely! >> >> I hope I can save you some pain Kostas! (cc-ing to flink dev to get >> feedback there for what I believe to be a confirmed bug) >> >> >> I was just about to open up a flink issue for this after digging (really) >> deep and figuring out the issue over the weekend. >> >> The problem arises due the flink hands input streams to the >> S3AccessHelper. If you turn on debug logs for s3, you will eventually see >> this stack trace: >> >> 2018-12-17 05:55:46,546 DEBUG >> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient - >> FYI: failed to reset content inputstream before throwing up >> java.io.IOException: Resetting to invalid mark >> at java.io.BufferedInputStream.reset(BufferedInputStream.java:448) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471) >> at >> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74) >> at >> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319) >> at >> org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> >> From this, you can see that for (some reason) AWS fails to write a >> multi-part chunk and then tries to reset the input stream in order to retry >> but fails (because the InputStream is not mark-able) >> >> That exception is swallowed (it seems like it should be raised up to >> client, but isn't for an unknown reason). The s3-client then tries to >> repeat the request using it's built in retry logic, however, because the >> InputStream is consumed >> and has no more bytes to write, we never fill up the expected >> content-length that the s3 put request is expecting. Eventually, after it >> hits the max number of retries, it fails and you get the error above. >> >> I just started running a fix for this (which is a hack not the real >> solution) here: >> https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6 >> >> This whole thing is documented here: >> https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/best-practices.html >> >> However, I found that just using the documented property didn't appear to >> work and I had to wrap the InputStream in the BufferedInputStream for it to >> work. >> >> I think the real fix is either to: >> >> 1. Use the BufferedInputStream but make it configurable >> 2. Refactor S3AccessHelper to have another signature that takes a File >> object and change the RefCountedFSOutputStream to also be able to give a >> reference the the underlying file. >> >> I can pretty easily do this work, but would be curious the direction that >> the maintainers would prefer. >> >> Thanks, >> >> Addison! >> >> >> >> >> >> >> On Fri, Dec 14, 2018 at 8:43 AM Kostas Kloudas < >> k.klou...@data-artisans.com> wrote: >> >>> Hi Steffen, >>> >>> Thanks for reporting this. >>> >>> Internally Flink does not keep any open connections to S3. It only >>> keeps buffers data internally up >>> till the point they reach a min-size limit (by default 5MB) and then >>> uploads them as a part of >>> an MPU on one go. Given this, I will have to dig a bit dipper to see why >>> a connection would timeout. >>> >>> If you are willing to dig into the code, all interactions with S3 pass >>> through the S3AccessHelper >>> class and its implementation, the HadoopS3AccessHelper. For the >>> buffering and uploading logic, >>> you could have a look at the S3RecoverableWriter and the >>> S3RecoverableFsDataOutputStream. >>> >>> I will keep looking into it. In the meantime, if you find anything let >>> us know. >>> >>> Cheers, >>> Kostas >>> >>> > *Grab is hiring. Learn more at **https://grab.careers > <https://grab.careers/>* > > By communicating with Grab Inc and/or its subsidiaries, associate > companies and jointly controlled entities (“Grab Group”), you are deemed to > have consented to processing of your personal data as set out in the > Privacy Notice which can be viewed at https://grab.com/privacy/ > > This email contains confidential information and is only for the intended > recipient(s). If you are not the intended recipient(s), please do not > disseminate, distribute or copy this email and notify Grab Group > immediately if you have received this by mistake and delete this email from > your system. Email transmission cannot be guaranteed to be secure or > error-free as any information therein could be intercepted, corrupted, > lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do > not accept liability for any errors or omissions in the contents of this > email arises as a result of email transmission. All intellectual property > rights in this email and attachments therein shall remain vested in Grab > Group, unless otherwise provided by law. > -- Kostas Kloudas | Software Engineer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen