Hi Ying,

Thanks for using the StreamingFileSink.

The reason why the StreamingFileSink only supports
OnCheckpointRollingPolicy with bulk
formats has to do with the fact that currently Flink relies on the Hadoop
writer for Parquet.

Bulk formats keep important details about how they write the actual data
(such as compression
schemes, offsets, etc) in metadata and they write this metadata with the
file (e.g. parquet writes
them as a footer). The hadoop writer gives no access to these metadata.
Given this, there is
no way for flink to be able to checkpoint a part file securely without
closing it.

The solution would be to write our own writer and not go through the hadoop
one, but there
are no concrete plans for this, as far as I know.

I hope this explains a bit more why the StreamingFileSink has this
limitation.

Cheers,
Kostas


On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <y...@lyft.com.invalid> wrote:

> Dear Flink community:
>
> We have a use case where StreamingFileSink
> <
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> >
> is used for persisting bulk-encoded data to AWS s3. In our case, the data
> sources consist of hybrid types of events, for which each type is uploaded
> to an individual s3 prefix location. Because the event size is highly
> skewed, the uploaded file size may differ dramatically.  In order to have a
> better control over the uploaded file size, we would like to adopt a
> rolling policy based on file sizes (e.g., roll the file every 100MB). Yet
> it appears bulk-encoding StreamingFileSink only supports checkpoint-based
> file rolling.
>
> IMPORTANT: Bulk-encoding formats can only be combined with the
> `OnCheckpointRollingPolicy`, which rolls the in-progress part file on every
> checkpoint.
>
> Checkpoint-based file rolling appears to have other side effects. For
> instance, quite a lot of the heavy liftings (e.g file parts uploading) are
> performed at the checkpointing time. As a result, checkpointing takes
> longer duration when data volume is high.
>
> Having a customized file rolling policy can be achieved by small
> adjustments on the BulkFormatBuilder interface in StreamingFileSink. In the
> case of using S3RecoverableWriter, file rolling triggers data uploading and
> corresponding S3Committer is also constructed and stored. Hence on the
> surface, adding a simple file-size based rolling policy would NOT
> compromise the established exact-once guarantee.
>
> Any advises on whether the above idea makes sense? Or perhaps there are
> pitfalls that one might pay attention when introducing such rolling policy.
> Thanks a lot!
>
>
> -
> Ying
>

Reply via email to