Hi Ying,

That sounds great!
Looking forward to your PR!

Btw don't you want to assign the issue to yourself if you are
planning to work on it?

Kostas

On Fri, Jun 28, 2019 at 9:54 AM Ying Xu <y...@lyft.com.invalid> wrote:

> Thanks Kostas for confirming!
>
> I've filed a issue FLINK-13027
> <https://issues.apache.org/jira/browse/FLINK-13027> .   We are actively
> working on the interface of such a file rolling policy, and will also
> perform benchmarks when it is integrated with a StreamingFileSink. We are
> more than happy to contribute if there's no other plan to address this
> issue.
>
> Thanks again.
>
> -
> Bests
> Ying
>
>
> On Tue, Jun 25, 2019 at 2:24 AM Kostas Kloudas <kklou...@gmail.com> wrote:
>
> > Hi Ying,
> >
> > You are right! If it is either on checkpoint or on size, then this is
> > doable even with the current state of things.
> > Could you open a JIRA so that we can keep track of the progress?
> >
> > Cheers,
> > Kostas
> >
> > On Tue, Jun 25, 2019 at 9:49 AM Ying Xu <y...@lyft.com.invalid> wrote:
> >
> > > HI Kostas:
> > >
> > > Thanks for the prompt reply.
> > >
> > > The file rolling policy mentioned previously is meant to roll files
> > EITHER
> > > when a size limited is reached, OR when a checkpoint happens.  Looks
> like
> > > every time a file is rolled, the part file is closed
> > > <
> > >
> >
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218
> > > >,
> > > during which file is closed with a committable returned
> > > <
> > >
> >
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240
> > > >.
> > > I assume it is during closeForCommit() when the Parquet file metatdata
> is
> > > written.  At a first glance, the code path of file rolling looks very
> > > similar to that inside prepareBucketForCheckpointing()
> > > <
> > >
> >
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275
> > > >.
> > > Not sure if I miss anything there.
> > >
> > >
> > > -
> > > Ying
> > >
> > >
> > > On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas <kklou...@gmail.com>
> > wrote:
> > >
> > > > Hi Ying,
> > > >
> > > > Thanks for using the StreamingFileSink.
> > > >
> > > > The reason why the StreamingFileSink only supports
> > > > OnCheckpointRollingPolicy with bulk
> > > > formats has to do with the fact that currently Flink relies on the
> > Hadoop
> > > > writer for Parquet.
> > > >
> > > > Bulk formats keep important details about how they write the actual
> > data
> > > > (such as compression
> > > > schemes, offsets, etc) in metadata and they write this metadata with
> > the
> > > > file (e.g. parquet writes
> > > > them as a footer). The hadoop writer gives no access to these
> metadata.
> > > > Given this, there is
> > > > no way for flink to be able to checkpoint a part file securely
> without
> > > > closing it.
> > > >
> > > > The solution would be to write our own writer and not go through the
> > > hadoop
> > > > one, but there
> > > > are no concrete plans for this, as far as I know.
> > > >
> > > > I hope this explains a bit more why the StreamingFileSink has this
> > > > limitation.
> > > >
> > > > Cheers,
> > > > Kostas
> > > >
> > > >
> > > > On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <y...@lyft.com.invalid>
> wrote:
> > > >
> > > > > Dear Flink community:
> > > > >
> > > > > We have a use case where StreamingFileSink
> > > > > <
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> > > > > >
> > > > > is used for persisting bulk-encoded data to AWS s3. In our case,
> the
> > > data
> > > > > sources consist of hybrid types of events, for which each type is
> > > > uploaded
> > > > > to an individual s3 prefix location. Because the event size is
> highly
> > > > > skewed, the uploaded file size may differ dramatically.  In order
> to
> > > > have a
> > > > > better control over the uploaded file size, we would like to adopt
> a
> > > > > rolling policy based on file sizes (e.g., roll the file every
> 100MB).
> > > Yet
> > > > > it appears bulk-encoding StreamingFileSink only supports
> > > checkpoint-based
> > > > > file rolling.
> > > > >
> > > > > IMPORTANT: Bulk-encoding formats can only be combined with the
> > > > > `OnCheckpointRollingPolicy`, which rolls the in-progress part file
> on
> > > > every
> > > > > checkpoint.
> > > > >
> > > > > Checkpoint-based file rolling appears to have other side effects.
> For
> > > > > instance, quite a lot of the heavy liftings (e.g file parts
> > uploading)
> > > > are
> > > > > performed at the checkpointing time. As a result, checkpointing
> takes
> > > > > longer duration when data volume is high.
> > > > >
> > > > > Having a customized file rolling policy can be achieved by small
> > > > > adjustments on the BulkFormatBuilder interface in
> StreamingFileSink.
> > In
> > > > the
> > > > > case of using S3RecoverableWriter, file rolling triggers data
> > uploading
> > > > and
> > > > > corresponding S3Committer is also constructed and stored. Hence on
> > the
> > > > > surface, adding a simple file-size based rolling policy would NOT
> > > > > compromise the established exact-once guarantee.
> > > > >
> > > > > Any advises on whether the above idea makes sense? Or perhaps there
> > are
> > > > > pitfalls that one might pay attention when introducing such rolling
> > > > policy.
> > > > > Thanks a lot!
> > > > >
> > > > >
> > > > > -
> > > > > Ying
> > > > >
> > > >
> > >
> >
>

Reply via email to