Thanks Ying!

Looking forward to your contribution.

Kostas

On Wed, Jul 3, 2019 at 6:48 PM Ying Xu <y...@lyft.com.invalid> wrote:

> Hi Kostas:
>
> For simplicity FLINK-13027
> <https://issues.apache.org/jira/browse/FLINK-13027> has been assigned to
> my
> current user ID. I will contribute using that ID.
>
> Will circulate with the community once we have initial success with this
> new rolling policy !
>
> Thank you again.
>
> -
> Ying
>
>
> On Fri, Jun 28, 2019 at 9:51 AM Ying Xu <y...@lyft.com> wrote:
>
> > Hi Kostas:
> >
> > I'd like to.  The account used to file the JIRA does not have contributor
> > access yet .  I had contributed a few Flink JIRAs in the past, using a
> very
> > similar but different account.  Now I would like to consolidate and use a
> > common account for Apache projects contributions.
> >
> > Would you mind granting me the contributor access for the following
> > account ?  This way I can assign the JIRA to myself.
> >            *yxu-apache
> > <https://issues.apache.org/jira/secure/ViewProfile.jspa?name=yxu-apache
> >*
> >
> > Many thanks!
> > <http://www.lyft.com/>
> > -
> > Ying
> >
> >
> > On Fri, Jun 28, 2019 at 2:23 AM Kostas Kloudas <kklou...@gmail.com>
> wrote:
> >
> >> Hi Ying,
> >>
> >> That sounds great!
> >> Looking forward to your PR!
> >>
> >> Btw don't you want to assign the issue to yourself if you are
> >> planning to work on it?
> >>
> >> Kostas
> >>
> >> On Fri, Jun 28, 2019 at 9:54 AM Ying Xu <y...@lyft.com.invalid> wrote:
> >>
> >> > Thanks Kostas for confirming!
> >> >
> >> > I've filed a issue FLINK-13027
> >> > <https://issues.apache.org/jira/browse/FLINK-13027> .   We are
> actively
> >> > working on the interface of such a file rolling policy, and will also
> >> > perform benchmarks when it is integrated with a StreamingFileSink. We
> >> are
> >> > more than happy to contribute if there's no other plan to address this
> >> > issue.
> >> >
> >> > Thanks again.
> >> >
> >> > -
> >> > Bests
> >> > Ying
> >> >
> >> >
> >> > On Tue, Jun 25, 2019 at 2:24 AM Kostas Kloudas <kklou...@gmail.com>
> >> wrote:
> >> >
> >> > > Hi Ying,
> >> > >
> >> > > You are right! If it is either on checkpoint or on size, then this
> is
> >> > > doable even with the current state of things.
> >> > > Could you open a JIRA so that we can keep track of the progress?
> >> > >
> >> > > Cheers,
> >> > > Kostas
> >> > >
> >> > > On Tue, Jun 25, 2019 at 9:49 AM Ying Xu <y...@lyft.com.invalid>
> wrote:
> >> > >
> >> > > > HI Kostas:
> >> > > >
> >> > > > Thanks for the prompt reply.
> >> > > >
> >> > > > The file rolling policy mentioned previously is meant to roll
> files
> >> > > EITHER
> >> > > > when a size limited is reached, OR when a checkpoint happens.
> Looks
> >> > like
> >> > > > every time a file is rolled, the part file is closed
> >> > > > <
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218
> >> > > > >,
> >> > > > during which file is closed with a committable returned
> >> > > > <
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240
> >> > > > >.
> >> > > > I assume it is during closeForCommit() when the Parquet file
> >> metatdata
> >> > is
> >> > > > written.  At a first glance, the code path of file rolling looks
> >> very
> >> > > > similar to that inside prepareBucketForCheckpointing()
> >> > > > <
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275
> >> > > > >.
> >> > > > Not sure if I miss anything there.
> >> > > >
> >> > > >
> >> > > > -
> >> > > > Ying
> >> > > >
> >> > > >
> >> > > > On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas <
> kklou...@gmail.com>
> >> > > wrote:
> >> > > >
> >> > > > > Hi Ying,
> >> > > > >
> >> > > > > Thanks for using the StreamingFileSink.
> >> > > > >
> >> > > > > The reason why the StreamingFileSink only supports
> >> > > > > OnCheckpointRollingPolicy with bulk
> >> > > > > formats has to do with the fact that currently Flink relies on
> the
> >> > > Hadoop
> >> > > > > writer for Parquet.
> >> > > > >
> >> > > > > Bulk formats keep important details about how they write the
> >> actual
> >> > > data
> >> > > > > (such as compression
> >> > > > > schemes, offsets, etc) in metadata and they write this metadata
> >> with
> >> > > the
> >> > > > > file (e.g. parquet writes
> >> > > > > them as a footer). The hadoop writer gives no access to these
> >> > metadata.
> >> > > > > Given this, there is
> >> > > > > no way for flink to be able to checkpoint a part file securely
> >> > without
> >> > > > > closing it.
> >> > > > >
> >> > > > > The solution would be to write our own writer and not go through
> >> the
> >> > > > hadoop
> >> > > > > one, but there
> >> > > > > are no concrete plans for this, as far as I know.
> >> > > > >
> >> > > > > I hope this explains a bit more why the StreamingFileSink has
> this
> >> > > > > limitation.
> >> > > > >
> >> > > > > Cheers,
> >> > > > > Kostas
> >> > > > >
> >> > > > >
> >> > > > > On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <y...@lyft.com.invalid>
> >> > wrote:
> >> > > > >
> >> > > > > > Dear Flink community:
> >> > > > > >
> >> > > > > > We have a use case where StreamingFileSink
> >> > > > > > <
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> >> > > > > > >
> >> > > > > > is used for persisting bulk-encoded data to AWS s3. In our
> case,
> >> > the
> >> > > > data
> >> > > > > > sources consist of hybrid types of events, for which each type
> >> is
> >> > > > > uploaded
> >> > > > > > to an individual s3 prefix location. Because the event size is
> >> > highly
> >> > > > > > skewed, the uploaded file size may differ dramatically.  In
> >> order
> >> > to
> >> > > > > have a
> >> > > > > > better control over the uploaded file size, we would like to
> >> adopt
> >> > a
> >> > > > > > rolling policy based on file sizes (e.g., roll the file every
> >> > 100MB).
> >> > > > Yet
> >> > > > > > it appears bulk-encoding StreamingFileSink only supports
> >> > > > checkpoint-based
> >> > > > > > file rolling.
> >> > > > > >
> >> > > > > > IMPORTANT: Bulk-encoding formats can only be combined with the
> >> > > > > > `OnCheckpointRollingPolicy`, which rolls the in-progress part
> >> file
> >> > on
> >> > > > > every
> >> > > > > > checkpoint.
> >> > > > > >
> >> > > > > > Checkpoint-based file rolling appears to have other side
> >> effects.
> >> > For
> >> > > > > > instance, quite a lot of the heavy liftings (e.g file parts
> >> > > uploading)
> >> > > > > are
> >> > > > > > performed at the checkpointing time. As a result,
> checkpointing
> >> > takes
> >> > > > > > longer duration when data volume is high.
> >> > > > > >
> >> > > > > > Having a customized file rolling policy can be achieved by
> small
> >> > > > > > adjustments on the BulkFormatBuilder interface in
> >> > StreamingFileSink.
> >> > > In
> >> > > > > the
> >> > > > > > case of using S3RecoverableWriter, file rolling triggers data
> >> > > uploading
> >> > > > > and
> >> > > > > > corresponding S3Committer is also constructed and stored.
> Hence
> >> on
> >> > > the
> >> > > > > > surface, adding a simple file-size based rolling policy would
> >> NOT
> >> > > > > > compromise the established exact-once guarantee.
> >> > > > > >
> >> > > > > > Any advises on whether the above idea makes sense? Or perhaps
> >> there
> >> > > are
> >> > > > > > pitfalls that one might pay attention when introducing such
> >> rolling
> >> > > > > policy.
> >> > > > > > Thanks a lot!
> >> > > > > >
> >> > > > > >
> >> > > > > > -
> >> > > > > > Ying
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>

Reply via email to