Hi Fabian,

Very thanks for the update! I think the latest version in general looks good 
from my side
and I think using separate feature interface would be much more easy to 
understand 
and extend in the future. I have some pending issues on the details though:

1. The first one is if we could support end-to-end exactly-once with 
post-committing
topology in the batch mode ? Since for the batch mode, currently we could only 
commit
 all the transactions after the whole job is finished, otherwise if there are 
JM failover or the 
writer / committer get restarted due to indeterminstic (A -> [B1, B2], A, B1 
have finished and
 B2 failed, if -> is rebalance / random / rescale,  all of A, B1, B2 would 
restarted), there might 
be repeat records. Previously one possible thought is to move committer and 
global committer
 to the operator coordinator, but if it is a topology, we might need some other 
kind of solutions?

2. I also want to have a dobule confirmation with the compatibility: since the 
old sink is also named 
with Sink, do we want to put the Sink v2 in a new package ? Besides, since we 
might want to keep 
only have one `sinkTo(Sink<?> sink)` , perhaps we also need to make the Sink v1 
to be a subclass of 
Sink v2 and extends the stateful and two-phase-commit sinks, right? 

3. I'd like also have a confirmation on ours thoughts with the `DataStreamSink` 
returned by the sinkTo method: 
The main issue is how do we implement the method like `setParallelism` or 
`setMaxParallelism` since now the sink 
would be translated to multiple transformations? perhaps we could make it the 
default values for all the transformations 
for the sink? A related issue would be for iceberg sink, I think it would need 
to have only one committer to avoid the 
competition of the optimistic locks (which would cause performance 
degradation), then it might need to have N writers 
with 1 committers, to build such topology, perhaps we might need to add new 
methods to specify the parallelism of 
the writers and committers separately? 

Best,
Yun



 ------------------Original Mail ------------------
Sender:Fabian Paul <fp...@apache.org>
Send Date:Mon Dec 13 23:59:43 2021
Recipients:dev <dev@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small 
file compaction
Hi all,



After a lot of discussions with different, we received very fruitful

feedback and reworked the ideas behind this FLIP. Initially, we had

the impression that the compaction problem is solvable by a single

topology that we can reuse across different sinks. We now have a

better understanding that different external systems require different

compaction mechanism i.e. Hive requires compaction before finally

registering the file in the metastore or Iceberg compacts the files

after they have been registered and just lazily compacts them.



Considering all these different views we came up with a design that

builds upon what @guowei....@gmail.com and @yungao...@aliyun.com have

proposed at the beginning. We allow inserting custom topologies before

and after the SinkWriters and Committers. Furthermore, we do not see

it as a downside. The Sink interfaces that will expose the DataStream

to the user reside in flink-streaming-java in contrast to the basic

Sink interfaces that reside fin flink-core deem it to be only used by

expert users.



Moreover, we also wanted to remove the global committer from the

unified Sink interfaces and replace it with a custom post-commit

topology. Unfortunately, we cannot do it without breaking the Sink

interface since the GlobalCommittables are part of the parameterized

Sink interface. Thus, we propose building a new Sink V2 interface

consisting of composable interfaces that do not offer the

GlobalCommitter anymore. We will implement a utility to extend a Sink

with post topology that mimics the behavior of the GlobalCommitter.

The new Sink V2 provides the same sort of methods as the Sink V1

interface, so a migration of sinks that do not use the GlobalCommitter

should be very easy.

We plan to keep the existing Sink V1 interfaces to not break

externally built sinks. As part of this FLIP, we migrate all the

connectors inside of the main repository to the new Sink V2 API.



The FLIP document is also updated and includes the proposed changes.



Looking forward to your feedback,

Fabian



https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction





On Thu, Dec 2, 2021 at 10:15 AM Roman Khachatryan  wrote:

>

> Thanks for clarifying (I was initially confused by merging state files

> rather than output files).

>

> > At some point, Flink will definitely have some WAL adapter that can turn 
> > any sink into an exactly-once sink (with some caveats). For now, we keep 
> > that as an orthogonal solution as it has a rather high price (bursty 
> > workload with high latency). Ideally, we can keep the compaction 
> > asynchronously...

>

> Yes, that would be something like a WAL. I agree that it would have a

> different set of trade-offs.

>

>

> Regards,

> Roman

>

> On Mon, Nov 29, 2021 at 3:33 PM Arvid Heise  wrote:

> >>

> >> > One way to avoid write-read-merge is by wrapping SinkWriter with

> >> > another one, which would buffer input elements in a temporary storage

> >> > (e.g. local file) until a threshold is reached; after that, it would

> >> > invoke the original SinkWriter. And if a checkpoint barrier comes in

> >> > earlier, it would send written data to some aggregator.

> >>

> >> I think perhaps this seems to be a kind of WAL method? Namely we first

> >> write the elements to some WAL logs and persist them on checkpoint

> >> (in snapshot or remote FS), or we directly write WAL logs to the remote

> >> FS eagerly.

> >>

> > At some point, Flink will definitely have some WAL adapter that can turn 
> > any sink into an exactly-once sink (with some caveats). For now, we keep 
> > that as an orthogonal solution as it has a rather high price (bursty 
> > workload with high latency). Ideally, we can keep the compaction 
> > asynchronously...

> >

> > On Mon, Nov 29, 2021 at 8:52 AM Yun Gao  wrote:

> >>

> >> Hi,

> >>

> >> @Roman very sorry for the late response for a long time,

> >>

> >> > Merging artifacts from multiple checkpoints would apparently

> >> require multiple concurrent checkpoints

> >>

> >> I think it might not need concurrent checkpoints: suppose some

> >> operators (like the committer aggregator in the option 2) maintains

> >> the list of files to merge, it could stores the lists of files to merge

> >> in the states, then after several checkpoints are done and we have

> >> enough files, we could merge all the files in the list.

> >>

> >> > Asynchronous merging in an aggregator would require some resolution

> >> > logic on recovery, so that a merged artifact can be used if the

> >> > original one was deleted. Otherwise, wouldn't recovery fail because

> >> > some artifacts are missing?

> >> > We could also defer deletion until the "compacted" checkpoint is

> >> > subsumed - but isn't it too late, as it will be deleted anyways once

> >> > subsumed?

> >>

> >> I think logically we could delete the original files once the "compacted" 
> >> checkpoint

> >> (which finish merging the compacted files and record it in the checkpoint) 
> >> is completed

> >> in all the options. If there are failover before we it, we could restart 
> >> the merging and if

> >> there are failover after it, we could have already recorded the files in 
> >> the checkpoint.

> >>

> >> > One way to avoid write-read-merge is by wrapping SinkWriter with

> >> > another one, which would buffer input elements in a temporary storage

> >> > (e.g. local file) until a threshold is reached; after that, it would

> >> > invoke the original SinkWriter. And if a checkpoint barrier comes in

> >> > earlier, it would send written data to some aggregator.

> >>

> >> I think perhaps this seems to be a kind of WAL method? Namely we first

> >> write the elements to some WAL logs and persist them on checkpoint

> >> (in snapshot or remote FS), or we directly write WAL logs to the remote

> >> FS eagerly.

> >>

> >> Sorry if I do not understand correctly somewhere.

> >>

> >> Best,

> >> Yun

> >>

> >>

> >> ------------------------------------------------------------------

> >> From:Roman Khachatryan 

> >> Send Time:2021 Nov. 9 (Tue.) 22:03

> >> To:dev 

> >> Subject:Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support 
> >> small file compaction

> >>

> >> Hi everyone,

> >>

> >> Thanks for the proposal and the discussion, I have some remarks:

> >> (I'm not very familiar with the new Sink API but I thought about the

> >> same problem in context of the changelog state backend)

> >>

> >> 1. Merging artifacts from multiple checkpoints would apparently

> >> require multiple concurrent checkpoints (otherwise, a new checkpoint

> >> won't be started before completing the previous one; and the previous

> >> one can't be completed before durably storing the artifacts). However,

> >> concurrent checkpoints are currently not supported with Unaligned

> >> checkpoints (this is besides increasing e2e-latency).

> >>

> >> 2. Asynchronous merging in an aggregator would require some resolution

> >> logic on recovery, so that a merged artifact can be used if the

> >> original one was deleted. Otherwise, wouldn't recovery fail because

> >> some artifacts are missing?

> >> We could also defer deletion until the "compacted" checkpoint is

> >> subsumed - but isn't it too late, as it will be deleted anyways once

> >> subsumed?

> >>

> >> 3. Writing small files, then reading and merging them for *every*

> >> checkpoint seems worse than only reading them on recovery. I guess I'm

> >> missing some cases of reading, so to me it would make sense to mention

> >> these cases explicitly in the FLIP motivation section.

> >>

> >> 4. One way to avoid write-read-merge is by wrapping SinkWriter with

> >> another one, which would buffer input elements in a temporary storage

> >> (e.g. local file) until a threshold is reached; after that, it would

> >> invoke the original SinkWriter. And if a checkpoint barrier comes in

> >> earlier, it would send written data to some aggregator. It will

> >> increase checkpoint delay (async phase) compared to the current Flink;

> >> but not compared to the write-read-merge solution, IIUC.

> >> Then such "BufferingSinkWriters" could aggregate input elements from

> >> each other, potentially recursively (I mean something like

> >> https://cwiki.apache.org/confluence/download/attachments/173082889/DSTL-DFS-DAG.png

> >> )

> >>

> >> 5. Reducing the number of files by reducing aggregator parallelism as

> >> opposed to merging on reaching size threshold will likely be less

> >> optimal and more difficult to configure. OTH, thresholds might be more

> >> difficult to implement and (with recursive merging) would incur higher

> >> latency. Maybe that's also something to decide explicitly or at least

> >> mention in the FLIP.

> >>

> >>

> >>

> >> Regards,

> >> Roman

> >>

> >>

> >> On Tue, Nov 9, 2021 at 5:23 AM Reo Lei  wrote:

> >> >

> >> > Hi Fabian,

> >> >

> >> > Thanks for drafting the FLIP and trying to support small file 
> >> > compaction. I

> >> > think this feature is very urgent and valuable for users(at least for 
> >> > me).

> >> >

> >> > Currently I am trying to support streaming rewrite(compact) for Iceberg 
> >> > on

> >> > PR#3323 . As Steven mentioned,

> >> > Iceberg sink and compact data through the following steps:

> >> > Step-1: Some parallel data writer(sinker) to write streaming data as 
> >> > files.

> >> > Step-2: A single parallelism data files committer to commit the completed

> >> > files as soon as possible to make them available.

> >> > Step-3: Some parallel file rewriter(compactor) to collect committed files

> >> > from multiple checkpoints, and rewriter(compact) them together once the

> >> > total file size or number of files reach the threshold.

> >> > Step-4: A single parallelism rewrite(compact) result committer to commit

> >> > the rewritten(compacted) files to replace the old files and make them

> >> > available.

> >> >

> >> >

> >> > If Flink want to support small file compaction, some key point I think is

> >> > necessary:

> >> >

> >> > 1, Compact files from multiple checkpoints.

> >> > I totally agree with Jingsong, because completed file size usually could

> >> > not reach the threshold in a single checkpoint. Especially for 
> >> > partitioned

> >> > table, we need to compact the files of each partition, but usually the 
> >> > file

> >> > size of each partition will be different and may not reach the merge

> >> > threshold. If we compact these files, in a single checkpoint, regardless 
> >> > of

> >> > whether the total file size reaches the threshold, then the value of

> >> > compacting will be diminished and we will still get small files because

> >> > these compacted files are not reach to target size. So we need the

> >> > compactor to collect committed files from multiple checkpoints and 
> >> > compact

> >> > them until they reach the threshold.

> >> >

> >> > 2, Separate write phase and compact phase.

> >> > Users usually hope the data becomes available as soon as possible, and 
> >> > the

> >> > end-to-end latency is very important. I think we need to separate the

> >> > write and compact phase. For the write phase, there include the Step-1

> >> > and Step-2, we sink data as file and commit it pre checkpoint and 
> >> > regardless

> >> > of whether the file size it is. That could ensure the data will be

> >> > available ASAP. For the compact phase, there include the Step-3

> >> > and Step-4, the compactor should collect committed files from multiple

> >> > checkpoints and compact them asynchronously once they reach the 
> >> > threshold,

> >> > and the compact committer will commit the compaction result in the next

> >> > checkpoint. We compact the committed files asynchronously because we 
> >> > don't

> >> > want the compaction to affect the data sink or the whole pipeline.

> >> >

> >> > 3, Exactly once guarantee between write and compact phase.

> >> > Once we separate write phase and compact phase, we need to consider

> >> > how to guarantee

> >> > the exact once semantic between two phases. We should not lose any data 
> >> > or

> >> > files on the compactor(Step-3) in any case and cause the compaction 
> >> > result

> >> > to be inconsistent with before. I think flink should provide an 
> >> > easy-to-use

> >> > interface to make that easier.

> >> >

> >> > 4, Metadata operation and compaction result validation.

> >> > In the compact phase, there may be not only compact files, but also a lot

> >> > of metadata operations, such as the iceberg needing to read/write 
> >> > manifest

> >> > and do MOR. And we need some interface to support users to do some

> >> > validation of the compaction result. I think these points should be

> >> > considered when we design the compaction API.

> >> >

> >> >

> >> > Back to FLIP-191, option 1 looks very complicated while option 2 is

> >> > relatively simple, but neither of these two solutions separates the write

> >> > phase from the compact phase. So I think we should consider the points I

> >> > mentioned above. And if you have any other questions you can always feel

> >> > free to reach out to me!

> >> >

> >> > BR,

> >> > Reo

> >> >

> >> > Fabian Paul  于2021年11月8日周一 下午7:59写道:

> >> >

> >> > > Hi all,

> >> > >

> >> > > Thanks for the lively discussions. I am really excited to see so many

> >> > > people

> >> > > participating in this thread. It also underlines the need that many 
> >> > > people

> >> > > would

> >> > > like to see a solution soon.

> >> > >

> >> > > I have updated the FLIP and removed the parallelism configuration 
> >> > > because

> >> > > it is

> >> > > unnecessary since users can configure a constant exchange key to send 
> >> > > all

> >> > > committables to only one committable aggregator.

> >> > >

> >> > >

> >> > > 1. Burden for developers w.r.t batch stream unification.

> >> > >

> >> > > @yun @guowei, from a theoretical point you are right about exposing the

> >> > > DataStream

> >> > > API in the sink users have the full power to write correct batch and

> >> > > streaming

> >> > > sinks. I think in reality a lot of users still struggle to build 
> >> > > pipelines

> >> > > with

> >> > > i.e. the operator pipeline which works correct in streaming and batch 
> >> > > mode.

> >> > > Another problem I see is by exposing more deeper concepts is that we

> >> > > cannot do

> >> > > any optimization because we cannot reason about how sinks are built in 
> >> > > the

> >> > > future.

> >> > >

> >> > > We should also try to steer users towards using only `Functions` to 
> >> > > give

> >> > > us more

> >> > > flexibility to swap the internal operator representation. I agree with

> >> > > @yun we

> >> > > should try to make the `ProcessFunction` more versatile to work on that

> >> > > goal but

> >> > > I see this as unrelated to the FLIP.

> >> > >

> >> > >

> >> > > 2. Regarding Commit / Global commit

> >> > >

> >> > > I envision the global committer to be specific depending on the data 
> >> > > lake

> >> > > solution you want to write to. However, it is entirely orthogonal to 
> >> > > the

> >> > > compaction.

> >> > > Currently, I do not expect any changes w.r.t the Global commit 
> >> > > introduces

> >> > > by

> >> > > this FLIP.

> >> > >

> >> > >

> >> > > 3. Regarding the case of trans-checkpoints merging

> >> > >

> >> > > @yun, as user, I would expect that if the committer receives in a

> >> > > checkpoint files

> >> > > to merge/commit that these are also finished when the checkpoint 
> >> > > finishes.

> >> > > I think all sinks rely on this principle currently i.e., KafkaSink 
> >> > > needs to

> >> > > commit all open transactions until the next checkpoint can happen.

> >> > >

> >> > > Maybe in the future, we can somehow move the Committer#commit call to 
> >> > > an

> >> > > asynchronous execution, but we should discuss it as a separate thread.

> >> > >

> >> > > > We probably should first describe the different causes of small 
> >> > > > files and

> >> > > > what problems was this proposal trying to solve. I wrote a data 
> >> > > > shuffling

> >> > > > proposal [1] for Flink Iceberg sink (shared with Iceberg community 
> >> > > > [2]).

> >> > > It

> >> > > > can address small files problems due to skewed data distribution 
> >> > > > across

> >> > > > Iceberg table partitions. Streaming shuffling before writers (to 
> >> > > > files)

> >> > > is

> >> > > > typically more efficient than post-write file compaction (which 
> >> > > > involves

> >> > > > read-merge-write). It is usually cheaper to prevent a problem (small

> >> > > files)

> >> > > > than fixing it.

> >> > >

> >> > >

> >> > > @steven you are raising a good point, although I think only using a

> >> > > customizable

> >> > > shuffle won't address the generation of small files. One assumption is 
> >> > > that

> >> > > at least the sink generates one file per subtask, which can already be 
> >> > > too

> >> > > many.

> >> > > Another problem is that with low checkpointing intervals, the files do 
> >> > > not

> >> > > meet

> >> > > the required size. The latter point is probably addressable by 
> >> > > changing the

> >> > > checkpoint interval, which might be inconvenient for some users.

> >> > >

> >> > > > The sink coordinator checkpoint problem (mentioned in option 1) 
> >> > > > would be

> >> > > > great if Flink can address it. In the spirit of source

> >> > > (enumerator-reader)

> >> > > > and sink (writer-coordinator) duality, sink coordinator checkpoint 
> >> > > > should

> >> > > > happen after the writer operator. This would be a natural fit to 
> >> > > > support

> >> > > > global committer in FLIP-143. It is probably an orthogonal matter to 
> >> > > > this

> >> > > > proposal.

> >> > >

> >> > >

> >> > > To me the question here is what are the benefits of having a 
> >> > > coordinator in

> >> > > comparison to a global committer/aggregator operator.

> >> > >

> >> > > > Personally, I am usually in favor of keeping streaming ingestion (to 
> >> > > > data

> >> > > > lake) relatively simple and stable. Also sometimes compaction and 
> >> > > > sorting

> >> > > > are performed together in data rewrite maintenance jobs to improve 
> >> > > > read

> >> > > > performance. In that case, the value of compacting (in Flink 
> >> > > > streaming

> >> > > > ingestion) diminishes.

> >> > >

> >> > >

> >> > > I agree it is always possible to have scheduled maintenance jobs 
> >> > > keeping

> >> > > care of

> >> > > your data i.e., doing compaction. Unfortunately, the downside is that 
> >> > > you

> >> > > have to your data after it is already available for other downstream

> >> > > consumers.

> >> > > I guess this can lead to all kinds of visibility problems. I am also

> >> > > surprised that

> >> > > you personally are a fan of this approach and, on the other hand, are

> >> > > developing

> >> > > the Iceberg sink, which goes somewhat against your mentioned principle 
> >> > > of

> >> > > keeping

> >> > > the sink simple.

> >> > >

> >> > > > Currently, it is unclear from the doc and this thread where the

> >> > > compaction

> >> > > > is actually happening. Jingsong's reply described one model

> >> > > > writer (parallel) -> aggregator (single-parallelism compaction 
> >> > > > planner)

> >> > > ->

> >> > > > compactor (parallel) -> global committer (single-parallelism)

> >> > >

> >> > >

> >> > > My idea of the topology is very similar to the one outlined by 
> >> > > Jinsong. The

> >> > > compaction will happen in the committer operator.

> >> > >

> >> > > >

> >> > > > In the Iceberg community, the following model has been discussed. It 
> >> > > > is

> >> > > > better for Iceberg because it won't delay the data availability.

> >> > > > writer (parallel) -> global committer for append (single 
> >> > > > parallelism) ->

> >> > > > compactor (parallel) -> global committer for rewrite commit (single

> >> > > > parallelism)

> >> > >

> >> > >

> >> > > From a quick glimpse, it seems that the exact same topology is 
> >> > > possible to

> >> > > express with the committable aggregator, but this definitely depends on

> >> > > the exact

> >> > > setup.

> >> > >

> >> > > Best,

> >> > > Fabian

> >>

Reply via email to