Hi Rong, We are using BucketingSink only. I'm looking for the case where TM does not get the chance to call Writer#flush like YARN killed the TM because of OOM. We have configured fs.s3.impl to com.amazon.ws.emr.hadoop.fs.EmrFileSystem in core-site.xml, so BucketingSink is using S3 client internally.
When we write data using S3A client, it buffers up the data in memory or disk until it hit multipart file size or call to close of OutputStream happens. Now suppose, S3A client buffers up 40MB data in TM's local disk and same time checkpoint barrier comes in at Sink and got successfully completed. Write process in sink resumes and now buffer data size reaches to 60MB and now YARN killed the TM. What would happen to original 40MB of data ? -- Thanks, Amit On Thu, May 17, 2018 at 10:28 PM, Rong Rong <walter...@gmail.com> wrote: > Hi Amit, > > Can you elaborate how you write using "S3 sink" and which version of Flink > you are using? > > If you are using BucketingSink[1], you can checkout the API doc and > configure to flush before closing your sink. > This way your sink is "integrated with the checkpointing mechanism to > provide exactly once semantics"[2] > > Thanks, > Rong > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/filesystem_sink.html > [2] > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html > > On Thu, May 17, 2018 at 2:57 AM, Amit Jain <aj201...@gmail.com> wrote: >> >> Hi, >> >> We are using Flink to process click stream data from Kafka and pushing >> the same in 128MB file in S3. >> >> What is the message processing guarantees with S3 sink? In my >> understanding, S3A client buffers the data on memory/disk. In failure >> scenario on particular node, TM would not trigger Writer#close hence >> buffered data can lose entirely assuming this buffer contains data of >> last successful checkpointing. >> >> -- >> Thanks, >> Amit > >