Thanks @aljoscha summary. I agree we should postpone the discussion of the
sink topology first and focus on the normal file sink and IcebergSink in
the Flink 1.12.

I have three little questions:

1. I think maybe we could add a EOI interface to the `GlobalCommit`. So
that we could make `write success file` be available in both batch and
stream execution mode.
2.  If we choose to let the two types of committer appear at the same time
in the API we have to figure out how to express the relation between the
two committers. I think the Sink API may look like the following: What do
you think?
Sink<T, CommT, CommR, ShareStateT...> {
        Writer<T, CommT, ShareStateT.....> createWriter();
        Optional<Committer<CommT>> createCommitter();
        Optional<GlobalComiitter<CommT, GlobalCommT>>
createGlobalCommitter();
}
3. Maybe a silly question: Why do we need `commit` return `CommitResult`? I
think the sink developer could rety himself. Why need the framework to do
the retry?

Best,
Guowei


On Tue, Sep 22, 2020 at 4:47 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Ah sorry, I think I now see what you mean. I think it's ok to add a
> `List<GlobalCommittableT> recoverCommittables(List<GlobalCommittableT>)`
> method.
>
>
> On 22.09.20 09:42, Aljoscha Krettek wrote:
> > On 22.09.20 06:06, Steven Wu wrote:
> >> In addition, it is undesirable to do the committed-or-not check in the
> >> commit method, which happens for each checkpoint cycle. CommitResult
> >> already indicates SUCCESS or not. when framework calls commit with a
> list
> >> of GlobalCommittableT, it should be certain they are uncommitted. The
> >> only
> >> time we aren't sure is when a list of  GlobalCommittableT is restored
> >> from
> >> a checkpoint. `*recoverGlobalCommittables*` is the ideal place to do
> >> such a
> >> check and filter out the ones that were already committed. Retained ones
> >> will be committed in the next checkpoint cycle. Since framework takes
> >> care
> >> of the checkpoint and restore, we need some hook for the sink to add the
> >> custom logic on the restored list.
> >
> > I think we don't need the `recoverGlobalCommittables()` hook. The sink
> > implementation would have to do the filtering once, so it can either do
> > it in the recover hook or it could do it in the next `commit()` call.
> > Both of these would mean we only have to do one pass through the list
> > and connect to Iceberg. Doing the check in `commit()` would mean the
> > interface of GlobalCommittable is simpler and to me it seems natural
> > that we do the check in the commit() method to ensure that commits are
> > idempotent.
> >
> > What do you think?
> >
> > Aljoscha
>
>

Reply via email to