????????
------------------ ???????? ------------------
??????:
"dev"
<[email protected]>;
????????: 2021??12??13??(??????) ????11:59
??????: "dev"<[email protected]>;
????: Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support
small file compaction
Hi all,
After a lot of discussions with different, we received very fruitful
feedback and reworked the ideas behind this FLIP. Initially, we had
the impression that the compaction problem is solvable by a single
topology that we can reuse across different sinks. We now have a
better understanding that different external systems require different
compaction mechanism i.e. Hive requires compaction before finally
registering the file in the metastore or Iceberg compacts the files
after they have been registered and just lazily compacts them.
Considering all these different views we came up with a design that
builds upon what @[email protected] and @[email protected] have
proposed at the beginning. We allow inserting custom topologies before
and after the SinkWriters and Committers. Furthermore, we do not see
it as a downside. The Sink interfaces that will expose the DataStream
to the user reside in flink-streaming-java in contrast to the basic
Sink interfaces that reside fin flink-core deem it to be only used by
expert users.
Moreover, we also wanted to remove the global committer from the
unified Sink interfaces and replace it with a custom post-commit
topology. Unfortunately, we cannot do it without breaking the Sink
interface since the GlobalCommittables are part of the parameterized
Sink interface. Thus, we propose building a new Sink V2 interface
consisting of composable interfaces that do not offer the
GlobalCommitter anymore. We will implement a utility to extend a Sink
with post topology that mimics the behavior of the GlobalCommitter.
The new Sink V2 provides the same sort of methods as the Sink V1
interface, so a migration of sinks that do not use the GlobalCommitter
should be very easy.
We plan to keep the existing Sink V1 interfaces to not break
externally built sinks. As part of this FLIP, we migrate all the
connectors inside of the main repository to the new Sink V2 API.
The FLIP document is also updated and includes the proposed changes.
Looking forward to your feedback,
Fabian
https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction
On Thu, Dec 2, 2021 at 10:15 AM Roman Khachatryan <[email protected]> wrote:
>
> Thanks for clarifying (I was initially confused by merging state files
> rather than output files).
>
> > At some point, Flink will definitely have some WAL adapter that can
turn any sink into an exactly-once sink (with some caveats). For now, we keep
that as an orthogonal solution as it has a rather high price (bursty workload
with high latency). Ideally, we can keep the compaction asynchronously...
>
> Yes, that would be something like a WAL. I agree that it would have a
> different set of trade-offs.
>
>
> Regards,
> Roman
>
> On Mon, Nov 29, 2021 at 3:33 PM Arvid Heise <[email protected]> wrote:
> >>
> >> > One way to avoid write-read-merge is by wrapping SinkWriter
with
> >> > another one, which would buffer input elements in a
temporary storage
> >> > (e.g. local file) until a threshold is reached; after that,
it would
> >> > invoke the original SinkWriter. And if a checkpoint barrier
comes in
> >> > earlier, it would send written data to some aggregator.
> >>
> >> I think perhaps this seems to be a kind of WAL method? Namely we
first
> >> write the elements to some WAL logs and persist them on checkpoint
> >> (in snapshot or remote FS), or we directly write WAL logs to the
remote
> >> FS eagerly.
> >>
> > At some point, Flink will definitely have some WAL adapter that can
turn any sink into an exactly-once sink (with some caveats). For now, we keep
that as an orthogonal solution as it has a rather high price (bursty workload
with high latency). Ideally, we can keep the compaction asynchronously...
> >
> > On Mon, Nov 29, 2021 at 8:52 AM Yun Gao
<[email protected]> wrote:
> >>
> >> Hi,
> >>
> >> @Roman very sorry for the late response for a long time,
> >>
> >> > Merging artifacts from multiple checkpoints would apparently
> >> require multiple concurrent checkpoints
> >>
> >> I think it might not need concurrent checkpoints: suppose some
> >> operators (like the committer aggregator in the option 2)
maintains
> >> the list of files to merge, it could stores the lists of files to
merge
> >> in the states, then after several checkpoints are done and we have
> >> enough files, we could merge all the files in the list.
> >>
> >> > Asynchronous merging in an aggregator would require some
resolution
> >> > logic on recovery, so that a merged artifact can be used if
the
> >> > original one was deleted. Otherwise, wouldn't recovery fail
because
> >> > some artifacts are missing?
> >> > We could also defer deletion until the "compacted"
checkpoint is
> >> > subsumed - but isn't it too late, as it will be deleted
anyways once
> >> > subsumed?
> >>
> >> I think logically we could delete the original files once the
"compacted" checkpoint
> >> (which finish merging the compacted files and record it in the
checkpoint) is completed
> >> in all the options. If there are failover before we it, we could
restart the merging and if
> >> there are failover after it, we could have already recorded the
files in the checkpoint.
> >>
> >> > One way to avoid write-read-merge is by wrapping SinkWriter
with
> >> > another one, which would buffer input elements in a
temporary storage
> >> > (e.g. local file) until a threshold is reached; after that,
it would
> >> > invoke the original SinkWriter. And if a checkpoint barrier
comes in
> >> > earlier, it would send written data to some aggregator.
> >>
> >> I think perhaps this seems to be a kind of WAL method? Namely we
first
> >> write the elements to some WAL logs and persist them on checkpoint
> >> (in snapshot or remote FS), or we directly write WAL logs to the
remote
> >> FS eagerly.
> >>
> >> Sorry if I do not understand correctly somewhere.
> >>
> >> Best,
> >> Yun
> >>
> >>
> >> ------------------------------------------------------------------
> >> From:Roman Khachatryan <[email protected]>
> >> Send Time:2021 Nov. 9 (Tue.) 22:03
> >> To:dev <[email protected]>
> >> Subject:Re: [DISCUSS] FLIP-191: Extend unified Sink interface to
support small file compaction
> >>
> >> Hi everyone,
> >>
> >> Thanks for the proposal and the discussion, I have some remarks:
> >> (I'm not very familiar with the new Sink API but I thought about
the
> >> same problem in context of the changelog state backend)
> >>
> >> 1. Merging artifacts from multiple checkpoints would apparently
> >> require multiple concurrent checkpoints (otherwise, a new
checkpoint
> >> won't be started before completing the previous one; and the
previous
> >> one can't be completed before durably storing the artifacts).
However,
> >> concurrent checkpoints are currently not supported with Unaligned
> >> checkpoints (this is besides increasing e2e-latency).
> >>
> >> 2. Asynchronous merging in an aggregator would require some
resolution
> >> logic on recovery, so that a merged artifact can be used if the
> >> original one was deleted. Otherwise, wouldn't recovery fail
because
> >> some artifacts are missing?
> >> We could also defer deletion until the "compacted" checkpoint is
> >> subsumed - but isn't it too late, as it will be deleted anyways
once
> >> subsumed?
> >>
> >> 3. Writing small files, then reading and merging them for *every*
> >> checkpoint seems worse than only reading them on recovery. I
guess I'm
> >> missing some cases of reading, so to me it would make sense to
mention
> >> these cases explicitly in the FLIP motivation section.
> >>
> >> 4. One way to avoid write-read-merge is by wrapping SinkWriter
with
> >> another one, which would buffer input elements in a temporary
storage
> >> (e.g. local file) until a threshold is reached; after that, it
would
> >> invoke the original SinkWriter. And if a checkpoint barrier comes
in
> >> earlier, it would send written data to some aggregator. It will
> >> increase checkpoint delay (async phase) compared to the current
Flink;
> >> but not compared to the write-read-merge solution, IIUC.
> >> Then such "BufferingSinkWriters" could aggregate input elements
from
> >> each other, potentially recursively (I mean something like
> >>
https://cwiki.apache.org/confluence/download/attachments/173082889/DSTL-DFS-DAG.png
> >> )
> >>
> >> 5. Reducing the number of files by reducing aggregator
parallelism as
> >> opposed to merging on reaching size threshold will likely be less
> >> optimal and more difficult to configure. OTH, thresholds might be
more
> >> difficult to implement and (with recursive merging) would incur
higher
> >> latency. Maybe that's also something to decide explicitly or at
least
> >> mention in the FLIP.
> >>
> >>
> >>
> >> Regards,
> >> Roman
> >>
> >>
> >> On Tue, Nov 9, 2021 at 5:23 AM Reo Lei <[email protected]>
wrote:
> >> >
> >> > Hi Fabian,
> >> >
> >> > Thanks for drafting the FLIP and trying to support small
file compaction. I
> >> > think this feature is very urgent and valuable for users(at
least for me).
> >> >
> >> > Currently I am trying to support streaming rewrite(compact)
for Iceberg on
> >> > PR#3323 <https://github.com/apache/iceberg/pull/3323>. As
Steven mentioned,
> >> > Iceberg sink and compact data through the following steps:
> >> > Step-1: Some parallel data writer(sinker) to write streaming
data as files.
> >> > Step-2: A single parallelism data files committer to commit
the completed
> >> > files as soon as possible to make them available.
> >> > Step-3: Some parallel file rewriter(compactor) to collect
committed files
> >> > from multiple checkpoints, and rewriter(compact) them
together once the
> >> > total file size or number of files reach the threshold.
> >> > Step-4: A single parallelism rewrite(compact) result
committer to commit
> >> > the rewritten(compacted) files to replace the old files and
make them
> >> > available.
> >> >
> >> >
> >> > If Flink want to support small file compaction, some key
point I think is
> >> > necessary:
> >> >
> >> > 1, Compact files from multiple checkpoints.
> >> > I totally agree with Jingsong, because completed file size
usually could
> >> > not reach the threshold in a single checkpoint. Especially
for partitioned
> >> > table, we need to compact the files of each partition, but
usually the file
> >> > size of each partition will be different and may not reach
the merge
> >> > threshold. If we compact these files, in a single
checkpoint, regardless of
> >> > whether the total file size reaches the threshold, then the
value of
> >> > compacting will be diminished and we will still get small
files because
> >> > these compacted files are not reach to target size. So we
need the
> >> > compactor to collect committed files from multiple
checkpoints and compact
> >> > them until they reach the threshold.
> >> >
> >> > 2, Separate write phase and compact phase.
> >> > Users usually hope the data becomes available as soon as
possible, and the
> >> > end-to-end latency is very important. I think we need
to separate the
> >> > write and compact phase. For the write phase, there include
the Step-1
> >> > and Step-2, we sink data as file and commit it pre
checkpoint and regardless
> >> > of whether the file size it is. That could ensure the data
will be
> >> > available ASAP. For the compact phase, there include the
Step-3
> >> > and Step-4, the compactor should collect committed
files from multiple
> >> > checkpoints and compact them asynchronously once they reach
the threshold,
> >> > and the compact committer will commit the compaction
result in the next
> >> > checkpoint. We compact the committed files asynchronously
because we don't
> >> > want the compaction to affect the data sink or the whole
pipeline.
> >> >
> >> > 3, Exactly once guarantee between write and compact phase.
> >> > Once we separate write phase and compact phase, we need to
consider
> >> > how to guarantee
> >> > the exact once semantic between two phases. We should not
lose any data or
> >> > files on the compactor(Step-3) in any case and cause the
compaction result
> >> > to be inconsistent with before. I think flink should provide
an easy-to-use
> >> > interface to make that easier.
> >> >
> >> > 4, Metadata operation and compaction result validation.
> >> > In the compact phase, there may be not only compact files,
but also a lot
> >> > of metadata operations, such as the iceberg needing to
read/write manifest
> >> > and do MOR. And we need some interface to support users to
do some
> >> > validation of the compaction result. I think these points
should be
> >> > considered when we design the compaction API.
> >> >
> >> >
> >> > Back to FLIP-191, option 1 looks very complicated while
option 2 is
> >> > relatively simple, but neither of these two solutions
separates the write
> >> > phase from the compact phase. So I think we should consider
the points I
> >> > mentioned above. And if you have any other questions you can
always feel
> >> > free to reach out to me!
> >> >
> >> > BR,
> >> > Reo
> >> >
> >> > Fabian Paul <[email protected]>
??2021??11??8?????? ????7:59??????
> >> >
> >> > > Hi all,
> >> > >
> >> > > Thanks for the lively discussions. I am really excited
to see so many
> >> > > people
> >> > > participating in this thread. It also underlines the
need that many people
> >> > > would
> >> > > like to see a solution soon.
> >> > >
> >> > > I have updated the FLIP and removed the parallelism
configuration because
> >> > > it is
> >> > > unnecessary since users can configure a constant
exchange key to send all
> >> > > committables to only one committable aggregator.
> >> > >
> >> > >
> >> > > 1. Burden for developers w.r.t batch stream unification.
> >> > >
> >> > > @yun @guowei, from a theoretical point you are right
about exposing the
> >> > > DataStream
> >> > > API in the sink users have the full power to write
correct batch and
> >> > > streaming
> >> > > sinks. I think in reality a lot of users still struggle
to build pipelines
> >> > > with
> >> > > i.e. the operator pipeline which works correct in
streaming and batch mode.
> >> > > Another problem I see is by exposing more deeper
concepts is that we
> >> > > cannot do
> >> > > any optimization because we cannot reason about how
sinks are built in the
> >> > > future.
> >> > >
> >> > > We should also try to steer users towards using only
`Functions` to give
> >> > > us more
> >> > > flexibility to swap the internal operator
representation. I agree with
> >> > > @yun we
> >> > > should try to make the `ProcessFunction` more versatile
to work on that
> >> > > goal but
> >> > > I see this as unrelated to the FLIP.
> >> > >
> >> > >
> >> > > 2. Regarding Commit / Global commit
> >> > >
> >> > > I envision the global committer to be specific
depending on the data lake
> >> > > solution you want to write to. However, it is entirely
orthogonal to the
> >> > > compaction.
> >> > > Currently, I do not expect any changes w.r.t the Global
commit introduces
> >> > > by
> >> > > this FLIP.
> >> > >
> >> > >
> >> > > 3. Regarding the case of trans-checkpoints merging
> >> > >
> >> > > @yun, as user, I would expect that if the committer
receives in a
> >> > > checkpoint files
> >> > > to merge/commit that these are also finished when the
checkpoint finishes.
> >> > > I think all sinks rely on this principle currently
i.e., KafkaSink needs to
> >> > > commit all open transactions until the next checkpoint
can happen.
> >> > >
> >> > > Maybe in the future, we can somehow move the
Committer#commit call to an
> >> > > asynchronous execution, but we should discuss it as a
separate thread.
> >> > >
> >> > > > We probably should first describe the different
causes of small files and
> >> > > > what problems was this proposal trying to solve. I
wrote a data shuffling
> >> > > > proposal [1] for Flink Iceberg sink (shared with
Iceberg community [2]).
> >> > > It
> >> > > > can address small files problems due to skewed
data distribution across
> >> > > > Iceberg table partitions. Streaming shuffling
before writers (to files)
> >> > > is
> >> > > > typically more efficient than post-write file
compaction (which involves
> >> > > > read-merge-write). It is usually cheaper to
prevent a problem (small
> >> > > files)
> >> > > > than fixing it.
> >> > >
> >> > >
> >> > > @steven you are raising a good point, although I think
only using a
> >> > > customizable
> >> > > shuffle won't address the generation of small files.
One assumption is that
> >> > > at least the sink generates one file per subtask, which
can already be too
> >> > > many.
> >> > > Another problem is that with low checkpointing
intervals, the files do not
> >> > > meet
> >> > > the required size. The latter point is probably
addressable by changing the
> >> > > checkpoint interval, which might be inconvenient for
some users.
> >> > >
> >> > > > The sink coordinator checkpoint problem (mentioned
in option 1) would be
> >> > > > great if Flink can address it. In the spirit of
source
> >> > > (enumerator-reader)
> >> > > > and sink (writer-coordinator) duality, sink
coordinator checkpoint should
> >> > > > happen after the writer operator. This would be a
natural fit to support
> >> > > > global committer in FLIP-143. It is probably an
orthogonal matter to this
> >> > > > proposal.
> >> > >
> >> > >
> >> > > To me the question here is what are the benefits of
having a coordinator in
> >> > > comparison to a global committer/aggregator operator.
> >> > >
> >> > > > Personally, I am usually in favor of keeping
streaming ingestion (to data
> >> > > > lake) relatively simple and stable. Also sometimes
compaction and sorting
> >> > > > are performed together in data rewrite maintenance
jobs to improve read
> >> > > > performance. In that case, the value of compacting
(in Flink streaming
> >> > > > ingestion) diminishes.
> >> > >
> >> > >
> >> > > I agree it is always possible to have scheduled
maintenance jobs keeping
> >> > > care of
> >> > > your data i.e., doing compaction. Unfortunately, the
downside is that you
> >> > > have to your data after it is already available for
other downstream
> >> > > consumers.
> >> > > I guess this can lead to all kinds of visibility
problems. I am also
> >> > > surprised that
> >> > > you personally are a fan of this approach and, on the
other hand, are
> >> > > developing
> >> > > the Iceberg sink, which goes somewhat against your
mentioned principle of
> >> > > keeping
> >> > > the sink simple.
> >> > >
> >> > > > Currently, it is unclear from the doc and this
thread where the
> >> > > compaction
> >> > > > is actually happening. Jingsong's reply described
one model
> >> > > > writer (parallel) -> aggregator
(single-parallelism compaction planner)
> >> > > ->
> >> > > > compactor (parallel) -> global committer
(single-parallelism)
> >> > >
> >> > >
> >> > > My idea of the topology is very similar to the one
outlined by Jinsong. The
> >> > > compaction will happen in the committer operator.
> >> > >
> >> > > >
> >> > > > In the Iceberg community, the following model has
been discussed. It is
> >> > > > better for Iceberg because it won't delay the data
availability.
> >> > > > writer (parallel) -> global committer for
append (single parallelism) ->
> >> > > > compactor (parallel) -> global committer for
rewrite commit (single
> >> > > > parallelism)
> >> > >
> >> > >
> >> > > From a quick glimpse, it seems that the exact same
topology is possible to
> >> > > express with the committable aggregator, but this
definitely depends on
> >> > > the exact
> >> > > setup.
> >> > >
> >> > > Best,
> >> > > Fabian
> >>