Fabian, thanks a lot for the proposal and starting the discussion.

We probably should first describe the different causes of small files and
what problems was this proposal trying to solve. I wrote a data shuffling
proposal [1] for Flink Iceberg sink (shared with Iceberg community [2]). It
can address small files problems due to skewed data distribution across
Iceberg table partitions. Streaming shuffling before writers (to files) is
typically more efficient than post-write file compaction (which involves
read-merge-write). It is usually cheaper to prevent a problem (small files)
than fixing it.

The sink coordinator checkpoint problem (mentioned in option 1) would be
great if Flink can address it. In the spirit of source (enumerator-reader)
and sink (writer-coordinator) duality, sink coordinator checkpoint should
happen after the writer operator. This would be a natural fit to support
global committer in FLIP-143. It is probably an orthogonal matter to this
proposal.

Personally, I am usually in favor of keeping streaming ingestion (to data
lake) relatively simple and stable. Also sometimes compaction and sorting
are performed together in data rewrite maintenance jobs to improve read
performance. In that case, the value of compacting (in Flink streaming
ingestion) diminishes.

Currently, it is unclear from the doc and this thread where the compaction
is actually happening. Jingsong's reply described one model
writer (parallel) -> aggregator (single-parallelism compaction planner) ->
compactor (parallel) -> global committer (single-parallelism)

In the Iceberg community, the following model has been discussed. It is
better for Iceberg because it won't delay the data availability.
writer (parallel) -> global committer for append (single parallelism) ->
compactor (parallel) -> global committer for rewrite commit (single
parallelism)

Thanks,
Steven

[1]
https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/
[2] https://www.mail-archive.com/dev@iceberg.apache.org/msg02889.html




On Thu, Nov 4, 2021 at 4:46 AM Yun Gao <yungao...@aliyun.com.invalid> wrote:

> Hi all,
>
> Very thanks for Fabian drafting the FLIP and the warm discussion!
>
> I'd like to complement some points based on the previous discussion:
>
> 1. Regarding the case of trans-checkpoints merging
>
> I agree with that all the options would not block the current checkpoint
> that producing the files to commit, but I think Guowei is referring to
> another
> issue: suppose the files are created in checkpoint 1 to 10 and we want to
> merge
> the files created in 10 checkpoints, then if with arbitrary
> topology we might merge the files during checkpoint 11 to 20, without
> blocking
> the following checkpoints. But if the compaction happens in
> Committer#commit
> as the option 2, I think perhaps with the current mechanism the commit
> need to
> be finished before checkpoint 11.
>
> 2. Regarding Commit / Global commit
>
> As a whole, I think perhaps whether we have compaction should be
> independent from whether we have the global committer ? The global
> committer is initially used to write the bucket meta after all the files
> of a single bucket is committed. Thus
> a) If there are failure & retry, the global commit should be wait.
> b) I think with all the options, the committable should represents a file
> after
>  compaction. It might be directly a file after compaction or a list of
> small files
> that wait to commit. Also, we would not need to wait for another
> checkpoint if
> we only use it in meta-writing cases. But still, I think the behavior here
> does not
> change whether we have compaction ?
>
> In fact, perhaps a better abstraction from my view is to remove the
> GlobalCommitter directly and only have one-level of committer.  If users
> need
> writing meta, then the action of writing metadata should be viewed as
> "commit". The users could write to the formal files freely, if there are
> failover, he could directly remove the unnecessary ones since all these
> files are invisible yet.
> But this might be a different topic.
>
> 3. Regarding the comparison of the API
>
> I totally agree with that for the specific case of compaction, option 2
> would
> indeed be easy to use since we have considered this case when we designed
> the new API. But as a whole, from another view, I think perhaps writing a
> stream / batch unified program with the DataStream API should not be that
> hard? It does not increase more difficulty compared to writing a normal
> stream / batch unified flink job. For the specific issues we mentioned, I
> think
> based on the previous discussion, we should finally add `finish()` to the
> UDF, and for now I think we could at least first add it to family of
> `ProcessFunction`.
>
> Best,
> Yun
>
>
>
> ------------------------------------------------------------------
> From:Arvid Heise <ar...@apache.org>
> Send Time:2021 Nov. 4 (Thu.) 16:55
> To:Till Rohrmann <trohrm...@apache.org>
> Cc:dev <dev@flink.apache.org>; "David Morávek" <d...@apache.org>
> Subject:Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support
> small file compaction
>
> >
> > Emitting records for downstream operators in or after
> > notifyCheckpointComplete no longer works after FLIP-147 when executing
> the
> > final checkpoint. The problem is that the final checkpoint happens after
> > the EOI and we would like to keep the property that you can terminate the
> > whole topology with a single checkpoint, if possible.
> >
>
> Sorry for the confusion, I meant to say emit them after the barrier (e.g.
> in snapshotState).
>
> On Thu, Nov 4, 2021 at 9:49 AM Till Rohrmann <trohrm...@apache.org> wrote:
>
> > Thanks for the detailed description Arvid.
> >
> > I might misunderstand things but one comment concerning:
> >
> > | We could even optimize the writer to only emit the committables after
> > notifyCheckpointComplete as long as we retain them in the state of the
> > writer.
> >
> > Emitting records for downstream operators in or after
> > notifyCheckpointComplete no longer works after FLIP-147 when executing
> the
> > final checkpoint. The problem is that the final checkpoint happens after
> > the EOI and we would like to keep the property that you can terminate the
> > whole topology with a single checkpoint, if possible.
> >
> > Cheers,
> > Till
> >
> > On Thu, Nov 4, 2021 at 9:05 AM Arvid Heise <ar...@apache.org> wrote:
> >
> >> Hi folks,
> >>
> >> thanks for the lively discussion. Let me present my point of view on a
> >> couple of items:
> >>
> >> *Impact on checkpointing times*
> >>
> >> Currently, we send the committables of the writer downstream before the
> >> barrier is sent. That allows us to include all committables in the
> state of
> >> the committers, such that the committer receives all committables
> before it
> >> snapshots.
> >>
> >> All proposals now add a processing step on the committables where
> certain
> >> committables are virtually merged into larger committables. However,
> none
> >> of the approaches require the physical merge to happen before the
> barrier
> >> is sent downstream. In fact, we can even delay the virtual merge after
> the
> >> checkpoint has been fully taken. We could even optimize the writer to
> only
> >> emit the committables after notifyCheckpointComplete as long as we
> >> retain them in the state of the writer. The important point is that the
> >> committables become part of the checkpoint, but they are always
> committed
> >> only after notifyCheckpointComplete. Especially the retry mechanism of
> >> 1.14 decouples the exact time of the commit from
> notifyCheckpointComplete.
> >> The retry happens asynchronously, so there is no reason to believe that
> we
> >> can't do the merging asynchronously with any option.
> >>
> >> Naturally, all approaches delay the checkpoint barrier a bit by either
> >> adding RPC calls or shuffles but the impact is rather minimal in a well
> >> configured system (the number of committables is assumed to be tiny), so
> >> I'm assuming a tad higher checkpointing time because the topology is
> more
> >> complex (in all cases).
> >>
> >> *Impact on latency*
> >>
> >> All approaches will also delay the effective commit, since additionaly
> >> work needs to be done but I'd argue that this is by design and should be
> >> clear to everyone. Especially, when merging files across checkpoints,
> >> certain results will not be visible until much later. Once, we settle
> for
> >> an approach, we should think which options we give to sink developers
> and
> >> end-users to impact that latency.
> >>
> >> An important aspect here is that we also refine the contract on
> >> GlobalCommitter. Currently, it's not clear when it is supposed to be
> >> called; for example, to register files in a metastore. Naively, I would
> >> have said that the GlobalCommitter is invoked when all committables of a
> >> certain checkpoint have been committed.
> >> a) But what happens in the case of a failure and retry? Do we delay
> until
> >> the commit finally happens?
> >> b) What do we do with committables that are held back for compaction? Do
> >> we global commit when all committables of checkpoint A are committed
> >> ignoring small files? Or do we wait until a later checkpoint, when all
> >> small files of A have been merged such that indeed all data of A has
> been
> >> committed.
> >>
> >> *Re: Developers understand that the cost is relatively high.*
> >>
> >> Yes that is a valid concern that we already have with committer and
> >> global committer (which none of the new users understand). I don't like
> >> that we have so many Optional methods where it's not clear which
> methods to
> >> implement to achieve certain functionality. Ideally, we split up the
> Sink
> >> in many smaller components where you add certain traits. For example,
> >> HiveSink implements Sink, HasState, HasCommitter, HasCompaction,
> >> HasGlobalCommitter (there are probably better names for the traits)
> >> We still can change that as everything is experimental (and annoy
> >> connector devs) but it seems to be an orthogonal discussion.
> >>
> >> If we compare Option 2 with Option 3, I'd argue that the mental model of
> >> the connector dev is more stressed with option 3 than with 2. He needs
> to
> >> map the high-level concepts of the current Sink to the low-level
> concepts
> >> of DataStream. He needs to understand the data that is being sent
> between
> >> writer and committer to be able to hook in.
> >> Note that we need to move away from just sending CommT and wrap it with
> >> some metadata, such as checkpoint id and subtask id.
> >>
> >> The dev also needs to care about managing the state, which we may
> >> abstract in Option 2 (not entirely sure). And the dev needs to
> understand
> >> the task life-cycle to emit the remaining committables before the job
> shuts
> >> down (is that even possible on DataStream level? Are we notified on EOI
> or
> >> do we expect devs to use operator API?). Lastly, if things are done
> >> asynchronously as you have championed, the user also needs to take care
> of
> >> ensuring that all async tasks are done before shutdown.
> >>
> >> *Re 2. Repeated implementation*
> >>
> >> The newly introduced `aggregate` can set the parallelism, thus perhaps
> >>> `setUid`, `slotSharingGroup (including resources)`, and
> `maxParallelism`
> >>> also need to be supported? If they are supported, additional workloads
> >>> are
> >>> required, and at the same time I feel that these workloads are
> >>> unnecessary;
> >>> if not, it will also increase the developers’ understanding cost: for
> >>> example, why these operators can not set these attributes?
> >>>
> >>
> >> I'm assuming that you mean that we are about to replicate API between
> >> DataStream and Sink compactor? I wouldn't do that. I would actually also
> >> fix the parallelism to the writer's parallelism. So we just have a
> pre-key,
> >> aggregate, and post-key where we can have defaults for the keys. We
> should
> >> provide access to metadata of CommT through a context. I'd always run
> >> compaction in the same slot sharing group as the writer, similar to how
> the
> >> committer is run. I don't see why we should take a different route with
> the
> >> design of the compaction than with the committer where we abstract all
> >> these things away.
> >>
> >> Also for another
> >>> example, `SinkWriter` has introduced many existing (or repeated)
> >>> implementations in the DataStream API in order to support ‘State’,
> >>> ‘Timer’,
> >>> ‘asynchronization’, ‘subindex’, etc.
> >>>
> >>
> >> I agree that the repeated implementations for Timer is weird but that
> >> could have been solved similar to how we solved it with asynchronization
> >> where we did not replicate interfaces but rather pulled up the concept
> of
> >> MailboxExecutor from datastream to core.
> >> Having specific Context objects is imho actually a good design decision
> -
> >> RuntimeContext is heavily overloaded and shouldn't be emulated or used.
> >>
> >>
> >>> In addition, if a new feature is introduced on the DataStream in the
> >>> future, I think it may be necessary to consider the special operators
> of
> >>> the sinks separately, which may also be a burden.
> >>>
> >>
> >> Yes, option 2 has a higher maintenance cost on Flink since Flink need to
> >> provide these operators but this gives us also more control about
> changing
> >> things in the future. If we expose the elements that we exchange between
> >> the different sink operators, we cannot change that in the future. Now
> we
> >> can easily abstract a change in the elements away.
> >> Regarding new features in DataStream: I think this is actually an
> >> advantage. We can evolve DataStream API without thinking about how that
> >> could screw up potential sinks.
> >>
> >> *Re 2.Aggregator(I think 1 parallelism is OK, why is it multiple
> >> parallelism?)*
> >>
> >> If a user wants to aggregate with parallelism 1, he can use a constant
> >> key (which we should describe in the javadoc) and that should be
> default.
> >> I think that a higher parallelism makes sense if you want to compact
> >> across checkpoints where you probably want to aggregate the files in
> >> different buckets and you can process buckets in parallel.
> >>
> >>
> >>
> >> I hope to have cleared things up a bit. I guess it became obvious that I
> >> prefer Option 2 because it gives us more means to provide a good
> >> implementation and help the sink developer while also giving us
> flexibility
> >> in the future. Happy to go more into details on certain points.
> >>
> >>
> >>
> >> On Thu, Nov 4, 2021 at 7:20 AM Guowei Ma <guowei....@gmail.com> wrote:
> >>
> >>> Hi,
> >>>
> >>> Very thanks Fabian for drafting this FLIP! It looks very good to me. I
> >>> see
> >>> currently most of us agree with option 2, but I personally feel that
> >>> option
> >>> 3 may be better :-)
> >>> I have some small concerns for option 2
> >>>
> >>> 1. Developers understand that the cost is relatively high.
> >>> The merging of small files is very important, but from the perspective
> of
> >>> sink as a whole, this requirement is only a special case of sink. We
> >>> expose
> >>> this requirement directly in the Sink API, which means that all
> >>> developers
> >>> may need to understand this "special case". From my personal point of
> >>> view,
> >>> for a developer who has no compaction requirements, seeing this API may
> >>> make him feel more complicated. In addition, from a higher level, the
> >>> first
> >>> level of the Sink API should have a relatively general abstraction; the
> >>> second level might be different abstractions according to specific
> types.
> >>>
> >>> 2. Repeated implementation
> >>> The newly introduced `aggregate` can set the parallelism, thus perhaps
> >>> `setUid`, `slotSharingGroup (including resources)`, and
> `maxParallelism`
> >>> also need to be supported? If they are supported, additional workloads
> >>> are
> >>> required, and at the same time I feel that these workloads are
> >>> unnecessary;
> >>> if not, it will also increase the developers’ understanding cost: for
> >>> example, why these operators can not set these attributes? Also for
> >>> another
> >>> example, `SinkWriter` has introduced many existing (or repeated)
> >>> implementations in the DataStream API in order to support ‘State’,
> >>> ‘Timer’,
> >>> ‘asynchronization’, ‘subindex’, etc.
> >>> In addition, if a new feature is introduced on the DataStream in the
> >>> future, I think it may be necessary to consider the special operators
> of
> >>> the sinks separately, which may also be a burden.
> >>>
> >>> 3. Might Blocking checkpoint
> >>> For option 2, we would need to actually merge files in the committer.
> >>> However, this might cause some problems in whether compaction blocks
> >>> checkpoint: suppose now our policy is to merge all the files produced
> in
> >>> 10
> >>> checkpoints, if we allow users to construct the topology, they could
> >>> merge
> >>> the file asynchronously in a dedicated executor, the merge could span
> >>> multiple checkpoints (as long as not exceed 10), and then we emit the
> >>> files
> >>> to be renamed to the committer. But if we merge in a committer, it has
> to
> >>> be done before the next checkpoint since we do not support asynchronous
> >>> commits now. But with option3, this can be done at present.
> >>>
> >>>
> >>> In addition, The FLIP mentioned that option 3 has a certain burden on
> >>> developers from the perspective of stream batch unification. Could you
> >>> enlighten me a little about what the burden is? I think that the
> current
> >>> APIs of DataStream are all stream-batch unification, and the topology
> in
> >>> Opiont3 is also done with `DataStream`, so I understand that developers
> >>> will not have any additional burdens. Or what is missing
> >>>
> >>> Best,
> >>> Guowei
> >>>
> >>>
> >>> On Thu, Nov 4, 2021 at 11:16 AM Jingsong Li <jingsongl...@gmail.com>
> >>> wrote:
> >>>
> >>> > Hi Fabian,
> >>> >
> >>> > Thanks for drafting the FLIP!
> >>> >
> >>> > ## Few thoughts of user requirements
> >>> >
> >>> > 1.compact files from multiple checkpoints
> >>> >
> >>> > This is what users need very much.
> >>> >
> >>> > 2.The compaction block the checkpointing
> >>> >
> >>> > - Some scenarios are required. For example, the user expects the
> >>> > output data to be consistent with the input data. If the checkpoint
> >>> > succeeds, it needs to see how much data is output. Otherwise, the
> user
> >>> > restarts a job to consume the same source offset, and he may lose
> >>> > data.
> >>> > - But I think in most cases, users are concerned about this, and we
> >>> > can delay the data to be visible.
> >>> >
> >>> > 3.The end-to-end latency of data
> >>> >
> >>> > This also depends on the situation.
> >>> > - Some user delays are very important. We'd better compact the data
> at
> >>> > the current checkpoint, even if it affects the checkpoint delay.
> >>> > - Some users think that the delay doesn't matter (the delay is at the
> >>> > minute level). As long as you compact the file, it won't bring me
> >>> > trouble with small files.
> >>> >
> >>> > So I think flexibility is important.
> >>> >
> >>> > ## Few thoughts on the option 2
> >>> >
> >>> > Option1 and option2 seem to be just the difference between the
> >>> > aggregator in the middle, whether it is a separate operator or a
> >>> > coordinator.
> >>> >
> >>> > I would be prefer to option 2.
> >>> >
> >>> > If I understand correctly, the whole process should be as follows:
> >>> >
> >>> > 1.SinkWriter ->
> >>> > 2.Aggregator(I think 1 parallelism is OK, why is it multiple
> >>> parallelism?)
> >>> > ->
> >>> > 3.LocalCommitter (Do compaction works) ->
> >>> > 4.GlobalCommitter
> >>> >
> >>> > The Aggregator can control single checkpoint compaction or cross
> >>> > checkpoint compaction. Controlling block or not block the current
> >>> > checkpoint. Controlling the end-to-end latency of data is how many
> >>> > checkpoints to wait for.
> >>> >
> >>> > Best,
> >>> > Jingsong
> >>> >
> >>> > On Wed, Nov 3, 2021 at 11:01 PM Fabian Paul <
> fabianp...@ververica.com>
> >>> > wrote:
> >>> > >
> >>> > >
> >>> > > Hi David and Till,
> >>> > >
> >>> > > Thanks for your great feedback. One definitely confusing point in
> the
> >>> > FLIP is who is doing the actual compaction. The compaction will not
> be
> >>> done
> >>> > > by the CommittableAggregator operator but the committers so it
> should
> >>> > also not affect the checkpointing duration or have a significant
> >>> performance
> >>> > > bottleneck because the committers are executed in parallel (also in
> >>> > batch mode [1]).
> >>> > >
> >>> > > I will update the FLIP to clarify it.
> >>> > >
> >>> > > > From your description I would be in favour of option 2 for the
> >>> > following
> >>> > > > reasons: Assuming that option 2 solves all our current problems,
> it
> >>> > seems
> >>> > > > like the least invasive change and smallest in scope. Your main
> >>> > concern is
> >>> > > > that it might not cover future use cases. Do you have some
> >>> specific use
> >>> > > > cases in mind?
> >>> > >
> >>> > > No, I do not have anything specific in mind I just wanted to raise
> >>> the
> >>> > point that adding more and more operators to the sink might
> complicate
> >>> the
> >>> > > development in the future that they can all be used together.
> >>> > >
> >>> > > > What I am missing a bit
> >>> > > > from the description is how option 2 will behave wrt checkpoints
> >>> and
> >>> > the
> >>> > > > batch execution mode.
> >>> > >
> >>> > > My idea was to always invoke CommittableAggregate#aggregate on a
> >>> > checkpoint and endOfInput. In the batch case the aggregation is only
> >>> done
> >>> > > once on all committables.
> >>> > >
> >>> > >
> >>> > > > Few thoughts on the option 2)
> >>> > > >
> >>> > > > The file compaction is by definition quite costly IO bound
> >>> operation.
> >>> > If I
> >>> > > > understand the proposal correctly, the aggregation itself would
> run
> >>> > during
> >>> > > > operator (aggregator) checkpoint. Would this significantly
> >>> increase the
> >>> > > > checkpoint duration?
> >>> > > >
> >>> > > > Compaction between different sub-tasks incur additional network
> IO
> >>> (to
> >>> > > > fetch the raw non-compacted files from the remote filesystem), so
> >>> this
> >>> > > > could quickly become a bottleneck. Basically we're decreasing the
> >>> sink
> >>> > > > parallelism (possible throughput) to parallelism of the
> aggregator.
> >>> > >
> >>> > > Hopefully these concerns are covered by the explanation at the
> >>> beginning.
> >>> > >
> >>> > > > To be really effective here, compaction would ideally be able to
> >>> > compact
> >>> > > > files from multiple checkpoints. However there is a huge tradeoff
> >>> > between
> >>> > > > latency a efficiency (especially with exactly once). Is this
> >>> something
> >>> > > > worth exploring?
> >>> > >
> >>> > > I agree with you by enabling the compaction across checkpoint the
> >>> > latency can increase because files might be committed several
> >>> checkpoints
> >>> > > later. I guess the best we can do is to let the user configure the
> >>> > behaviour. By configuring the checkpointing interval and the wanted
> >>> file
> >>> > size the
> >>> > > user can already affect the latency.
> >>> > > Is this answering you questions? I am not fully sure what you are
> >>> > referring to with efficiency. @dvmk
> >>> > >
> >>> > > > I hope that with option 2, we can support both use cases: single
> >>> task
> >>> > > compaction as well as cross task compaction if needed. Similarly
> for
> >>> > single
> >>> > > checkpoint compaction as well as cross checkpoint compaction.
> >>> > >
> >>> > > Compaction across subtasks should be controllable by the
> parallelism
> >>> of
> >>> > the commttableAggregator operator i.e. a parallelism of 2 can reduce
> >>> > > the computational complexity but might not compute the best
> >>> compaction.
> >>> > >
> >>> > > Best,
> >>> > > Fabian
> >>> > >
> >>> > > [1] https://github.com/apache/flink/pull/17536 <
> >>> > https://github.com/apache/flink/pull/17536>)
> >>> >
> >>> >
> >>> >
> >>> > --
> >>> > Best, Jingsong Lee
> >>> >
> >>>
> >>
>

Reply via email to