Hi Ying,

You are right! If it is either on checkpoint or on size, then this is
doable even with the current state of things.
Could you open a JIRA so that we can keep track of the progress?

Cheers,
Kostas

On Tue, Jun 25, 2019 at 9:49 AM Ying Xu <y...@lyft.com.invalid> wrote:

> HI Kostas:
>
> Thanks for the prompt reply.
>
> The file rolling policy mentioned previously is meant to roll files EITHER
> when a size limited is reached, OR when a checkpoint happens.  Looks like
> every time a file is rolled, the part file is closed
> <
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218
> >,
> during which file is closed with a committable returned
> <
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240
> >.
> I assume it is during closeForCommit() when the Parquet file metatdata is
> written.  At a first glance, the code path of file rolling looks very
> similar to that inside prepareBucketForCheckpointing()
> <
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275
> >.
> Not sure if I miss anything there.
>
>
> -
> Ying
>
>
> On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas <kklou...@gmail.com> wrote:
>
> > Hi Ying,
> >
> > Thanks for using the StreamingFileSink.
> >
> > The reason why the StreamingFileSink only supports
> > OnCheckpointRollingPolicy with bulk
> > formats has to do with the fact that currently Flink relies on the Hadoop
> > writer for Parquet.
> >
> > Bulk formats keep important details about how they write the actual data
> > (such as compression
> > schemes, offsets, etc) in metadata and they write this metadata with the
> > file (e.g. parquet writes
> > them as a footer). The hadoop writer gives no access to these metadata.
> > Given this, there is
> > no way for flink to be able to checkpoint a part file securely without
> > closing it.
> >
> > The solution would be to write our own writer and not go through the
> hadoop
> > one, but there
> > are no concrete plans for this, as far as I know.
> >
> > I hope this explains a bit more why the StreamingFileSink has this
> > limitation.
> >
> > Cheers,
> > Kostas
> >
> >
> > On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <y...@lyft.com.invalid> wrote:
> >
> > > Dear Flink community:
> > >
> > > We have a use case where StreamingFileSink
> > > <
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> > > >
> > > is used for persisting bulk-encoded data to AWS s3. In our case, the
> data
> > > sources consist of hybrid types of events, for which each type is
> > uploaded
> > > to an individual s3 prefix location. Because the event size is highly
> > > skewed, the uploaded file size may differ dramatically.  In order to
> > have a
> > > better control over the uploaded file size, we would like to adopt a
> > > rolling policy based on file sizes (e.g., roll the file every 100MB).
> Yet
> > > it appears bulk-encoding StreamingFileSink only supports
> checkpoint-based
> > > file rolling.
> > >
> > > IMPORTANT: Bulk-encoding formats can only be combined with the
> > > `OnCheckpointRollingPolicy`, which rolls the in-progress part file on
> > every
> > > checkpoint.
> > >
> > > Checkpoint-based file rolling appears to have other side effects. For
> > > instance, quite a lot of the heavy liftings (e.g file parts uploading)
> > are
> > > performed at the checkpointing time. As a result, checkpointing takes
> > > longer duration when data volume is high.
> > >
> > > Having a customized file rolling policy can be achieved by small
> > > adjustments on the BulkFormatBuilder interface in StreamingFileSink. In
> > the
> > > case of using S3RecoverableWriter, file rolling triggers data uploading
> > and
> > > corresponding S3Committer is also constructed and stored. Hence on the
> > > surface, adding a simple file-size based rolling policy would NOT
> > > compromise the established exact-once guarantee.
> > >
> > > Any advises on whether the above idea makes sense? Or perhaps there are
> > > pitfalls that one might pay attention when introducing such rolling
> > policy.
> > > Thanks a lot!
> > >
> > >
> > > -
> > > Ying
> > >
> >
>

Reply via email to