Hi, Krzysztof Chmielewski [1] from Delta-Flink connector open source community here [2].
I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is something exactly what Flink-Delta Sink needs since it is the place where we do an actual commit to the Delta Log which should be done from a one place/instance. Currently I'm evaluating V2 for our connector and having, how Steven described it a "more natural, built-in concept/support of GlobalCommitter in the sink v2 interface" would be greatly appreciated. Cheers, Krzysztof Chmielewski [1] https://github.com/kristoffSC [2] https://github.com/delta-io/connectors/tree/master/flink czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz...@gmail.com> napisaĆ(a): > Hi Yun, > > Thanks a lot for the reply! > > While we can add the global committer in the WithPostCommitTopology, the > semantics are weird. The Commit stage actually didn't commit anything to > the Iceberg table, and the PostCommit stage is where the Iceberg commit > happens. > > I just took a quick look at DeltaLake Flink sink. It still uses the V1 sink > interface [1]. I think it might have the same issue when switching to the > V2 sink interface. > > For data lake storages (like Iceberg, DeltaLake) or any storage with global > transactional commit, it would be more natural to have a built-in > concept/support of GlobalCommitter in the sink v2 interface. > > Thanks, > Steven > > [1] > > https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yungao...@aliyun.com.invalid> > wrote: > > > Hi Steven, Liwei, > > Very sorry for missing this mail and response very late. > > I think the initial thought is indeed to use `WithPostCommitTopology` as > > a replacement of the original GlobalCommitter, and currently the adapter > of > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1 > > interface > > onto an implementation of `WithPostCommitTopology`. > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus It seems > > to > > me it could support both global committer and small file compaction? We > > might > > have an `WithPostCommitTopology` implementation like > > DataStream ds = add global committer; > > if (enable file compaction) { > > build the compaction subgraph from ds > > } > > Best, > > Yun > > [1] > > > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > > < > > > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > > > > > ------------------------------------------------------------------ > > From:Steven Wu <stevenz...@gmail.com> > > Send Time:2022 Aug. 17 (Wed.) 07:30 > > To:dev <dev@flink.apache.org>; hililiwei <hilili...@gmail.com> > > Subject:Re: Sink V2 interface replacement for GlobalCommitter > > > Plus, it will disable the future capability of small file compaction > > stage post commit. > > I should clarify this comment. if we are using the > `WithPostCommitTopology` > > for global committer, we would lose the capability of using the post > commit > > stage for small files compaction. > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz...@gmail.com> wrote: > > > > > > In the V1 sink interface, there is a GlobalCommitter for Iceberg. With > > the > > > V2 sink interface, GlobalCommitter has been deprecated by > > > WithPostCommitTopology. I thought the post commit stage is mainly for > > async > > > maintenance (like compaction). > > > > > > Are we supposed to do sth similar to the GlobalCommittingSinkAdapter? > It > > > seems like a temporary transition plan for bridging v1 sinks to v2 > > > interfaces. > > > > > > private class GlobalCommittingSinkAdapter extends > > TwoPhaseCommittingSinkAdapter > > > implements WithPostCommitTopology<InputT, CommT> { > > > @Override > > > public void addPostCommitTopology(DataStream<CommittableMessage<CommT>> > > committables) { > > > StandardSinkTopologies.addGlobalCommitter( > > > committables, > > > GlobalCommitterAdapter::new, > > > () -> sink.getCommittableSerializer().get()); > > > } > > > } > > > > > > > > > In the Iceberg PR [1] for adopting the new sink interface, Liwei used > the > > > "global" partitioner to force all committables go to a single committer > > > task 0. It will effectively force a global committer disguised in the > > > parallel committers. It is a little weird and also can lead to > questions > > > why other committer tasks are not getting any messages. Plus, it will > > > disable the future capability of small file compaction stage post > commit. > > > Hence, I am asking what is the right approach to achieve global > committer > > > behavior. > > > > > > Thanks, > > > Steven > > > > > > [1] https://github.com/apache/iceberg/pull/4904/files#r946975047 < > > https://github.com/apache/iceberg/pull/4904/files#r946975047 > > > > > > >