Hi Rong,

We are using BucketingSink only. I'm looking for the case where TM
does not get the chance to call Writer#flush like YARN killed the TM
because of OOM. We have configured fs.s3.impl to
com.amazon.ws.emr.hadoop.fs.EmrFileSystem in core-site.xml, so
BucketingSink is using S3 client internally.

When we write data using S3A client, it buffers up the data in memory
or disk until it hit multipart file size or call to close of
OutputStream happens. Now suppose, S3A client buffers up 40MB data in
TM's local disk and same time checkpoint barrier comes in at Sink and
got successfully completed. Write process in sink resumes and now
buffer data size reaches to 60MB and now YARN killed the TM. What
would happen to original 40MB of data ?

--
Thanks,
Amit




On Thu, May 17, 2018 at 10:28 PM, Rong Rong <walter...@gmail.com> wrote:
> Hi Amit,
>
> Can you elaborate how you write using "S3 sink" and which version of Flink
> you are using?
>
> If you are using BucketingSink[1], you can checkout the API doc and
> configure to flush before closing your sink.
> This way your sink is "integrated with the checkpointing mechanism to
> provide exactly once semantics"[2]
>
> Thanks,
> Rong
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/filesystem_sink.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html
>
> On Thu, May 17, 2018 at 2:57 AM, Amit Jain <aj201...@gmail.com> wrote:
>>
>> Hi,
>>
>> We are using Flink to process click stream data from Kafka and pushing
>> the same in 128MB file in S3.
>>
>> What is the message processing guarantees with S3 sink? In my
>> understanding, S3A client buffers the data on memory/disk. In failure
>> scenario on particular node, TM would not trigger Writer#close hence
>> buffered data can lose entirely assuming this buffer contains data of
>> last successful checkpointing.
>>
>> --
>> Thanks,
>> Amit
>
>

Reply via email to