Yes, that sounds good! I'll probably have some comments on the FLIP about the names of generic parameters and the Javadoc but we can address them later or during implementation.

I also think that we probably need the FAIL,RETRY,SUCCESS result for globalCommit() but we can also do that as a later addition.

So I think we're good to go to update the FLIP, do any last minute changes and then vote.

Best,
Aljoscha

On 23.09.20 06:13, Guowei Ma wrote:
Hi, all

Thank everyone very much for your ideas and suggestions. I would try to
summarize again the consensus :). Correct me if I am wrong or misunderstand
you.

## Consensus-1

1. The motivation of the unified sink API is to decouple the sink
implementation from the different runtime execution mode.
2. The initial scope of the unified sink API only covers the file system
type, which supports the real transactions. The FLIP focuses more on the
semantics the new sink api should support.
3. We prefer the first alternative API, which could give the framework a
greater opportunity to optimize.
4. The `Writer` needs to add a method `prepareCommit`, which would be
called from `prepareSnapshotPreBarrier`. And remove the `Flush` method.
5. The FLIP could move the `Snapshot & Drain` section in order to be more
focused.

## Consensus-2

1. What should the “Unified Sink API” support/cover? It includes two
aspects. 1. The same sink implementation would work for both the batch and
stream execution mode. 2. In the long run we should give the sink developer
the ability of building “arbitrary” topologies. But for Flink-1.12 we
should be more focused on only satisfying the S3/HDFS/Iceberg sink.
2. Because the batch execution mode does not have the normal checkpoint the
sink developer should not depend on it any more if we want a unified sink.
3. We can benefit by providing an asynchronous `Writer` version. But
because the unified sink is already very complicated, we don’t add this in
the first version.


According to these consensus I would propose the first version of the new
sink api as follows. What do you think? Any comments are welcome.

/**
  * This interface lets the sink developer build a simple transactional sink
topology pattern, which satisfies the HDFS/S3/Iceberg sink.
  * This sink topology includes one {@link Writer} + one {@link Committer} +
one {@link GlobalCommitter}.
  * The {@link Writer} is responsible for producing the committable.
  * The {@link Committer} is responsible for committing a single
committables.
  * The {@link GlobalCommitter} is responsible for committing an aggregated
committable, which we called global committables.
  *
  * But both the {@link Committer} and the {@link GlobalCommitter} are
optional.
  */
interface TSink<IN, CommT, GCommT, WriterS> {

         Writer<IN, CommT, WriterS> createWriter(InitContext initContext);

         Writer<IN, CommT, WriterS> restoreWriter(InitContext initContext,
List<WriterS> states);

         Optional<Committer<CommT>> createCommitter();

         Optional<GlobalCommitter<CommT, GCommT>> createGlobalCommitter();

         SimpleVersionedSerializer<CommT> getCommittableSerializer();

         Optional<SimpleVersionedSerializer<GCommT>>
getGlobalCommittableSerializer();
}

/**
  * The {@link GlobalCommitter} is responsible for committing an aggregated
committable, which we called global committables.
  */
interface GlobalCommitter<CommT, GCommT> {

         /**
          * This method is called when restoring from a failover.
          * @param globalCommittables the global committables that are not
committed in the previous session.
          * @return the global committables that should be committed again
in the current session.
          */
         List<GCommT> filterRecoveredCommittables(List<GCommT>
globalCommittables);

         /**
          * Compute an aggregated committable from a collection of
committables.
          * @param committables a collection of committables that are needed
to combine
          * @return an aggregated committable
          */
         GCommT combine(List<CommT> committables);

         void commit(List<GCommT> globalCommittables);

         /**
          * There are no committables any more.
          */
         void endOfInput();
}

Best,
Guowei

Reply via email to