Hi,
Krzysztof Chmielewski [1] from Delta-Flink connector open source community
here [2].

I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
something exactly what Flink-Delta Sink needs since it is the place where
we do an actual commit to the Delta Log which should be done from a one
place/instance.

Currently I'm evaluating V2 for our connector and having, how Steven
described it a "more natural, built-in concept/support of GlobalCommitter
in the sink v2 interface" would be greatly appreciated.

Cheers,
Krzysztof Chmielewski

[1] https://github.com/kristoffSC
[2] https://github.com/delta-io/connectors/tree/master/flink

czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz...@gmail.com> napisaƂ(a):

> Hi Yun,
>
> Thanks a lot for the reply!
>
> While we can add the global committer in the WithPostCommitTopology, the
> semantics are weird. The Commit stage actually didn't commit anything to
> the Iceberg table, and the PostCommit stage is where the Iceberg commit
> happens.
>
> I just took a quick look at DeltaLake Flink sink. It still uses the V1 sink
> interface [1]. I think it might have the same issue when switching to the
> V2 sink interface.
>
> For data lake storages (like Iceberg, DeltaLake) or any storage with global
> transactional commit, it would be more natural to have a built-in
> concept/support of GlobalCommitter in the sink v2 interface.
>
> Thanks,
> Steven
>
> [1]
>
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
>
>
> On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yungao...@aliyun.com.invalid>
> wrote:
>
> > Hi Steven, Liwei,
> > Very sorry for missing this mail and response very late.
> > I think the initial thought is indeed to use `WithPostCommitTopology` as
> > a replacement of the original GlobalCommitter, and currently the adapter
> of
> > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1
> > interface
> > onto an implementation of `WithPostCommitTopology`.
> > Since `WithPostCommitTopology` supports arbitrary subgraph, thus It seems
> > to
> > me it could support both global committer and small file compaction? We
> > might
> > have an `WithPostCommitTopology` implementation like
> > DataStream ds = add global committer;
> > if (enable file compaction) {
> >  build the compaction subgraph from ds
> > }
> > Best,
> > Yun
> > [1]
> >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> > <
> >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> > >
> > ------------------------------------------------------------------
> > From:Steven Wu <stevenz...@gmail.com>
> > Send Time:2022 Aug. 17 (Wed.) 07:30
> > To:dev <dev@flink.apache.org>; hililiwei <hilili...@gmail.com>
> > Subject:Re: Sink V2 interface replacement for GlobalCommitter
> > > Plus, it will disable the future capability of small file compaction
> > stage post commit.
> > I should clarify this comment. if we are using the
> `WithPostCommitTopology`
> > for global committer, we would lose the capability of using the post
> commit
> > stage for small files compaction.
> > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz...@gmail.com> wrote:
> > >
> > > In the V1 sink interface, there is a GlobalCommitter for Iceberg. With
> > the
> > > V2 sink interface, GlobalCommitter has been deprecated by
> > > WithPostCommitTopology. I thought the post commit stage is mainly for
> > async
> > > maintenance (like compaction).
> > >
> > > Are we supposed to do sth similar to the GlobalCommittingSinkAdapter?
> It
> > > seems like a temporary transition plan for bridging v1 sinks to v2
> > > interfaces.
> > >
> > > private class GlobalCommittingSinkAdapter extends
> > TwoPhaseCommittingSinkAdapter
> > > implements WithPostCommitTopology<InputT, CommT> {
> > > @Override
> > > public void addPostCommitTopology(DataStream<CommittableMessage<CommT>>
> > committables) {
> > > StandardSinkTopologies.addGlobalCommitter(
> > > committables,
> > > GlobalCommitterAdapter::new,
> > > () -> sink.getCommittableSerializer().get());
> > > }
> > > }
> > >
> > >
> > > In the Iceberg PR [1] for adopting the new sink interface, Liwei used
> the
> > > "global" partitioner to force all committables go to a single committer
> > > task 0. It will effectively force a global committer disguised in the
> > > parallel committers. It is a little weird and also can lead to
> questions
> > > why other committer tasks are not getting any messages. Plus, it will
> > > disable the future capability of small file compaction stage post
> commit.
> > > Hence, I am asking what is the right approach to achieve global
> committer
> > > behavior.
> > >
> > > Thanks,
> > > Steven
> > >
> > > [1] https://github.com/apache/iceberg/pull/4904/files#r946975047 <
> > https://github.com/apache/iceberg/pull/4904/files#r946975047 >
> > >
> >
>

Reply via email to