Hi, Steven

>>I also have a clarifying question regarding the WriterStateT. Since
>>IcebergWriter won't need to checkpoint any state, should we set it to
*Void*
>>type? Since getWriterStateSerializer() returns Optional, that is clear and
>>we can return Optional.empty().

Yes I think you could do it. If you return Optional.empty() we would ignore
all the state you return.

Best,
Guowei


On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma <guowei....@gmail.com> wrote:

> Hi,Steven
>
> Thank you for reading the FLIP so carefully.
> 1. The frame can not know which `GlobalCommT` to retry if we use the
> List<GlobalCommT> as parameter when the `commit` returns `RETRY`.
> 2. Of course we can let the `commit` return more detailed info but it
> might be too complicated.
> 3. On the other hand, I think only when restoring IcebergSink needs a
> collection of `GlobalCommT` and giving back another collection of
> `GlobalCommT` that are not committed.
>
> Best,
> Guowei
>
>
> On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <stevenz...@gmail.com> wrote:
>
>> Guowei,
>>
>> Thanks a lot for updating the wiki page. It looks great.
>>
>> I noticed one inconsistency in the wiki with your last summary email for
>> GlobalCommitter interface. I think the version in the summary email is the
>> intended one, because rollover from previous failed commits can accumulate
>> a list.
>> CommitResult commit(GlobalCommT globalCommittable); // in the wiki
>> =>
>> CommitResult commit(List<GlobalCommT> globalCommittable);  // in the
>> summary email
>>
>> I also have a clarifying question regarding the WriterStateT. Since
>> IcebergWriter won't need to checkpoint any state, should we set it to
>> *Void*
>> type? Since getWriterStateSerializer() returns Optional, that is clear and
>> we can return Optional.empty().
>>
>> Thanks,
>> Steven
>>
>> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <guowei....@gmail.com> wrote:
>>
>> > Thanks Aljoscha for your suggestion.  I have updated FLIP. Any comments
>> are
>> > welcome.
>> >
>> > Best,
>> > Guowei
>> >
>> >
>> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <aljos...@apache.org>
>> > wrote:
>> >
>> > > Yes, that sounds good! I'll probably have some comments on the FLIP
>> > > about the names of generic parameters and the Javadoc but we can
>> address
>> > > them later or during implementation.
>> > >
>> > > I also think that we probably need the FAIL,RETRY,SUCCESS result for
>> > > globalCommit() but we can also do that as a later addition.
>> > >
>> > > So I think we're good to go to update the FLIP, do any last minute
>> > > changes and then vote.
>> > >
>> > > Best,
>> > > Aljoscha
>> > >
>> > > On 23.09.20 06:13, Guowei Ma wrote:
>> > > > Hi, all
>> > > >
>> > > > Thank everyone very much for your ideas and suggestions. I would
>> try to
>> > > > summarize again the consensus :). Correct me if I am wrong or
>> > > misunderstand
>> > > > you.
>> > > >
>> > > > ## Consensus-1
>> > > >
>> > > > 1. The motivation of the unified sink API is to decouple the sink
>> > > > implementation from the different runtime execution mode.
>> > > > 2. The initial scope of the unified sink API only covers the file
>> > system
>> > > > type, which supports the real transactions. The FLIP focuses more on
>> > the
>> > > > semantics the new sink api should support.
>> > > > 3. We prefer the first alternative API, which could give the
>> framework
>> > a
>> > > > greater opportunity to optimize.
>> > > > 4. The `Writer` needs to add a method `prepareCommit`, which would
>> be
>> > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush`
>> method.
>> > > > 5. The FLIP could move the `Snapshot & Drain` section in order to be
>> > more
>> > > > focused.
>> > > >
>> > > > ## Consensus-2
>> > > >
>> > > > 1. What should the “Unified Sink API” support/cover? It includes two
>> > > > aspects. 1. The same sink implementation would work for both the
>> batch
>> > > and
>> > > > stream execution mode. 2. In the long run we should give the sink
>> > > developer
>> > > > the ability of building “arbitrary” topologies. But for Flink-1.12
>> we
>> > > > should be more focused on only satisfying the S3/HDFS/Iceberg sink.
>> > > > 2. Because the batch execution mode does not have the normal
>> checkpoint
>> > > the
>> > > > sink developer should not depend on it any more if we want a unified
>> > > sink.
>> > > > 3. We can benefit by providing an asynchronous `Writer` version. But
>> > > > because the unified sink is already very complicated, we don’t add
>> this
>> > > in
>> > > > the first version.
>> > > >
>> > > >
>> > > > According to these consensus I would propose the first version of
>> the
>> > new
>> > > > sink api as follows. What do you think? Any comments are welcome.
>> > > >
>> > > > /**
>> > > >   * This interface lets the sink developer build a simple
>> transactional
>> > > sink
>> > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink.
>> > > >   * This sink topology includes one {@link Writer} + one {@link
>> > > Committer} +
>> > > > one {@link GlobalCommitter}.
>> > > >   * The {@link Writer} is responsible for producing the committable.
>> > > >   * The {@link Committer} is responsible for committing a single
>> > > > committables.
>> > > >   * The {@link GlobalCommitter} is responsible for committing an
>> > > aggregated
>> > > > committable, which we called global committables.
>> > > >   *
>> > > >   * But both the {@link Committer} and the {@link GlobalCommitter}
>> are
>> > > > optional.
>> > > >   */
>> > > > interface TSink<IN, CommT, GCommT, WriterS> {
>> > > >
>> > > >          Writer<IN, CommT, WriterS> createWriter(InitContext
>> > > initContext);
>> > > >
>> > > >          Writer<IN, CommT, WriterS> restoreWriter(InitContext
>> > > initContext,
>> > > > List<WriterS> states);
>> > > >
>> > > >          Optional<Committer<CommT>> createCommitter();
>> > > >
>> > > >          Optional<GlobalCommitter<CommT, GCommT>>
>> > > createGlobalCommitter();
>> > > >
>> > > >          SimpleVersionedSerializer<CommT>
>> getCommittableSerializer();
>> > > >
>> > > >          Optional<SimpleVersionedSerializer<GCommT>>
>> > > > getGlobalCommittableSerializer();
>> > > > }
>> > > >
>> > > > /**
>> > > >   * The {@link GlobalCommitter} is responsible for committing an
>> > > aggregated
>> > > > committable, which we called global committables.
>> > > >   */
>> > > > interface GlobalCommitter<CommT, GCommT> {
>> > > >
>> > > >          /**
>> > > >           * This method is called when restoring from a failover.
>> > > >           * @param globalCommittables the global committables that
>> are
>> > > not
>> > > > committed in the previous session.
>> > > >           * @return the global committables that should be committed
>> > > again
>> > > > in the current session.
>> > > >           */
>> > > >          List<GCommT> filterRecoveredCommittables(List<GCommT>
>> > > > globalCommittables);
>> > > >
>> > > >          /**
>> > > >           * Compute an aggregated committable from a collection of
>> > > > committables.
>> > > >           * @param committables a collection of committables that
>> are
>> > > needed
>> > > > to combine
>> > > >           * @return an aggregated committable
>> > > >           */
>> > > >          GCommT combine(List<CommT> committables);
>> > > >
>> > > >          void commit(List<GCommT> globalCommittables);
>> > > >
>> > > >          /**
>> > > >           * There are no committables any more.
>> > > >           */
>> > > >          void endOfInput();
>> > > > }
>> > > >
>> > > > Best,
>> > > > Guowei
>> > >
>> > >
>> >
>>
>

Reply via email to