Hi Ying, Thanks for using the StreamingFileSink.
The reason why the StreamingFileSink only supports OnCheckpointRollingPolicy with bulk formats has to do with the fact that currently Flink relies on the Hadoop writer for Parquet. Bulk formats keep important details about how they write the actual data (such as compression schemes, offsets, etc) in metadata and they write this metadata with the file (e.g. parquet writes them as a footer). The hadoop writer gives no access to these metadata. Given this, there is no way for flink to be able to checkpoint a part file securely without closing it. The solution would be to write our own writer and not go through the hadoop one, but there are no concrete plans for this, as far as I know. I hope this explains a bit more why the StreamingFileSink has this limitation. Cheers, Kostas On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <y...@lyft.com.invalid> wrote: > Dear Flink community: > > We have a use case where StreamingFileSink > < > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html > > > is used for persisting bulk-encoded data to AWS s3. In our case, the data > sources consist of hybrid types of events, for which each type is uploaded > to an individual s3 prefix location. Because the event size is highly > skewed, the uploaded file size may differ dramatically. In order to have a > better control over the uploaded file size, we would like to adopt a > rolling policy based on file sizes (e.g., roll the file every 100MB). Yet > it appears bulk-encoding StreamingFileSink only supports checkpoint-based > file rolling. > > IMPORTANT: Bulk-encoding formats can only be combined with the > `OnCheckpointRollingPolicy`, which rolls the in-progress part file on every > checkpoint. > > Checkpoint-based file rolling appears to have other side effects. For > instance, quite a lot of the heavy liftings (e.g file parts uploading) are > performed at the checkpointing time. As a result, checkpointing takes > longer duration when data volume is high. > > Having a customized file rolling policy can be achieved by small > adjustments on the BulkFormatBuilder interface in StreamingFileSink. In the > case of using S3RecoverableWriter, file rolling triggers data uploading and > corresponding S3Committer is also constructed and stored. Hence on the > surface, adding a simple file-size based rolling policy would NOT > compromise the established exact-once guarantee. > > Any advises on whether the above idea makes sense? Or perhaps there are > pitfalls that one might pay attention when introducing such rolling policy. > Thanks a lot! > > > - > Ying >