Thanks for the answers, Fabian! To follow up:

*1.* SGTM, thanks!

*2.* << If some of the RPCs are delayed or do not reach the manager, the
Committer will accumulate committables from multiple checkpoints. >>
So does that mean it's possible for the Commiter to commit Committables for
checkpointIds=[2], then checkpointId=[1], then checkpointIds=[3, 4] ?
That is:
  - (*2a*) the committable checkpointIds can be out of order, and
  - (*2b*) there can be committables for multiple checkpointIds in a given
Committer::commit call?

*3.* Imagine this scenario: the SinkWrinters start writing parquet files.
The latest Delta table version they are aware of is version 10. The
SinkWriters create committables, and when the Committer tries to commit
them, it sees that the latest Delta table version is now 15 *and* someone
enabled Column Mapping at version 12. That means that, at and after version
12, all parquet files must be written using the physical column names, not
the logical column names. This means that the parquet files written by our
SinkWriters and referenced by the committables are *invalid*. In Spark, we
would fail the commit and would allow the writers to rebase, re-write the
data, and re-attempt the commit. I don't think this is possible in Flink?

*4.* << Can you explain more about the relation between your questions and
using the SupportsPreCommitTopology::addPreCommitTopology? It sounds like
you are not planning to use the Committer but a custom globalCommitter. >>

Yup, we do want a single committer (also called a global committer). Having
a preCommitTopology that routes all the committables using `.global()` will
cause all the committables to go to a single (global) committer, correct?
(And yes, if there is job parallelism of N, then there will be N-1
committers instantiated but never called).

The V1-to-V2 Sink Adapter maps the V1GlobalCommitter to a
V2-*post*-commit-topology
operator. When starting fresh with the V2 Sink API, this
post-commit-topology doesn't seem the most natural place to implement the
single committer. Further, we may want to do other optional optimizations
in the post-commit-topology. Having the commit done in the Committer and
optional optimizations done in a post-commit seems like the best choice.

Let me know what you think!

Thanks!

Scott

On Thu, Jun 27, 2024 at 2:52 AM Fabian Paul <fp...@apache.org> wrote:

> Hi Scott,
>
> It's great to see further adoption of the Sink V2 architecture.
> Happy to answer your questions.
>
> 1. The sink architecture should ensure that Committer:commit is always
> called with all committables from a subtask for a given subtaskId.There is
> an open issue where users have reported a problem with that assumption [1]
> but we haven't been able to track the problem down. Behind the scenes, if
> you use a SinkWriter->Committer topology, the Committables are transferred
> via the checkpoint barrier channel to the committer (not transferring the
> committables results in failing the checkpoint) and checkpointed by the
> committer before Committer:commit is called. This means when
> Committer:commit is called, it reads the comittables from the local state.
>
> 2. Committer::commit is called on what we call in Flink
> notifyCheckpointComplete which is based on a RPC call that the Jobmanager
> makes to all Taskmanagers when a checkpoint is finished. There is no
> guarantee when or if this will be called, but eventually. If some of the
> RPCs are delayed or do not reach the manager, the Committer will accumulate
> committables from multiple checkpoints.
>
> 3. I am not sure I fully understand that point. I see two different
> requirements. First, you could skip a committable if you do not want to
> commit it, which you could do with calling
> CommitRequest::signalAlreadyCommitted [2]. It's not the primary purpose of
> the method, but it should suffice. The second point is having a
> communication mechanism between SinkWriter and Committer, which at the
> moment does not exist. I would love to hear more details about why the
> rewrite is necessary maybe we can model the sink differently to achieve
> that requirement.
>
> Can you explain more about the relation between your questions and using
> the SupportsPreCommitTopology::addPreCommitTopology? It sounds like you are
> not planning to use the Committer but a custom globalCommitter.
>
> Best,
> Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-25920
> [2]
>
> https://github.com/apache/flink/blob/7fc3aac774f5deb9b48727ba5f916c78085b49b9/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java#L100
>


-- 
[image: email_signature_logo_sm]
*Scott Sandre*
*Sr. Software Engineer*
*Delta Ecosystem Team*
*scott.san...@databricks.com <scott.san...@databricks.com>*

Reply via email to