Sounds good. The important aspect is working with savepoints - makes sense.
I have a few follow-up questions:

Is there a reason we are excluding this from the first version?
How will the bundling work if this module is in flink-connector-kafka?
Won't it rely on parts of core flink? Are we adding the state-processor-api
as a dependency to this module?

Looking at the FLIP, I empathize with the desire and I think it is a great
addition to the overall streaming community.

Ryan van Huuksloot
Staff Engineer, Infrastructure | Streaming Platform
[image: Shopify]
<https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>


On Mon, May 4, 2026 at 12:48 PM Aleksandr Savonin <[email protected]>
wrote:

> Hi Ryan,
> Thanks for the questions and arguments.
>
> The problem this FLIP addresses is concrete: when a Flink job is no
> longer running but checkpointed data exists in an uncommitted Kafka
> transaction, there is currently no way to commit that data. The
> transaction will eventually be aborted by the broker, and the data is
> lost. We need a solution for this issue.
>
> The FLIP's Future Work section proposes State Processor API
> integration to automatically extract `producerId` and `epoch` from
> Flink checkpoint/savepoint metadata. This would let operators run the
> tool without manual data retrieval. Such functionality belongs in
> Flink -> it requires reading Flink-specific state and could not live
> in Kafka's repo.
>
> On maintenance burden - the proposed module is very small and
> isolated. The working draft
> (https://github.com/apache/flink-connector-kafka/pull/238) reuses
> existing `FlinkKafkaInternalProducer` logic that the connector already
> maintains for its primary commit path, the new code is essentially a
> CLI wrapper around it.
>
> So while the abort case could in principle be handled by Kafka
> tooling, the commit case and its natural evolution (e.g. toward
> checkpoint-aware automation) is tied to Flink's state model. That is
> one of the arguments why this belongs in the Flink ecosystem.
>
>
> Kind regards,
>
> Aleksandr Savonin
>
> On Mon, 4 May 2026 at 16:34, Ryan van Huuksloot via dev
> <[email protected]> wrote:
> >
> > Hello,
> >
> > Sorry for the late reply.
> >
> > Reading this FLIP, I don't clearly understand why this belongs in the
> Flink
> > ecosystem. Nothing seems unique to Flink other than its implementation of
> > Exactly-Once semantics. We aren't the only Exactly-Once producer.
> >
> > I read the following from the mailing list.
> >
> > >For exactly-once delivery, Flink acts as the orchestrator on top of
> > Kafka's transactions, deciding when to commit based on checkpoint
> > completion. The problem arises when Flink disappears (there is no
> > running Flink job anymore) and nobody is left to tell Kafka's
> > coordinator to commit or abort. Kafka's coordinator in this case is
> > behaving as designed - it holds the transaction open because it has no
> > way to know whether the data should be committed or aborted. That
> > decision belongs to Flink, which is no longer running.
> >
> > The decision belonged* to Flink. However once we start discussing using a
> > CLI to complete transactions, the decision no longer belongs to Flink.
> The
> > user employing that CLI should decide what to do with that transaction.
> > That boundary no longer requires Flink.
> >
> > I would push back pretty hard on adding this maintenance burden to this
> > community. It seems to me that Kafka has an API limitation that we are
> > ignoring, pushing this responsibility onto Flink.
> >
> > Before we add this to Flink, can we discuss more why it belongs in this
> > community? I see in your FLIP the `Use Kafka's built-in
> > kafka-transactions.sh tool`, which is fine but we don't discuss what
> could
> > be added or changed there.
> >
> > Ryan van Huuksloot
> > Staff Engineer, Infrastructure | Streaming Platform
> > [image: Shopify]
> > <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email
> >
> >
> >
> > On Fri, Apr 24, 2026 at 7:44 AM Aleksandr Savonin <[email protected]>
> > wrote:
> >
> > > Hi Shekhar and Hongshun,
> > > Thank you for the questions and the participation.
> > > Could you please clarify if you have any further questions? If not, I
> > > will start a voting thread.
> > >
> > > On Fri, 10 Apr 2026 at 14:07, Aleksandr Savonin <[email protected]>
> > > wrote:
> > > >
> > > > Hi Shekhar Prasad Rajak,
> > > >
> > > > > Don't we have timeout, session timeout at kafka side ?
> > > >
> > > > Yes, Kafka does have a timeout and when the timeout fires, the
> > > > transaction is aborted by the broker. However, if Flink successfully
> > > > completed a checkpoint but didn't commit the transaction (e.g. the
> JVM
> > > > crashed before committing the transaction to Kafka), the data should
> > > > be committed to satisfy 2PC protocol promises. Relying on the timeout
> > > > may belong to data loss in this scenario.
> > > > Moreover, some Flink-Kafka setups use high values for Kafka
> > > > transaction timeouts, in that case, even transactions that must be
> > > > aborted (because they were not checkpointed), will not be aborted
> > > > until timeout fires, blocking the LSO. The tool provides a way to
> > > > resolve this immediately.
> > > >
> > > > Kind regards,
> > > > Aleksandr
> > > >
> > > > On Wed, 8 Apr 2026 at 05:34, Shekhar Rajak <
> [email protected]>
> > > wrote:
> > > > >
> > > > > Thanks for clarifying but
> > > > > >  it holds the transaction open because it has noway to know
> whether
> > > the data should be committed or aborted. That
> > > > > decision belongs to Flink, which is no longer running.
> > > > >
> > > > > Don't we have timeout, session timeout at kafka side ? After
> enabling
> > > 2PC we must be using those config to make sure Kafka broker releases
> the
> > > locks and have durability.
> > > > >
> > > > > Regards,
> > > > > Shekhar Prasad Rajak
> > > > >
> > > > >
> > > > >     On Monday 30 March 2026 at 04:34:07 pm GMT+5:30, Aleksandr
> Savonin
> > > <[email protected]> wrote:
> > > > >
> > > > >  Hi Shekhar Rajak,
> > > > > Good questions.
> > > > > Let me clarify the transaction coordination model and why a
> > > > > Flink-specific tool is needed.
> > > > >
> > > > > > Curious to know if we debug and analyse the root cause and try to
> > > fix/enhance in the Kafka transaction coordinator side, will this issue
> be
> > > resolved ?
> > > > >
> > > > >
> > > > > Kafka has a built-in Transaction Coordinator, but it only manages
> the
> > > > > Kafka protocol layer, it executes commit/abort when told to.
> > > > > For exactly-once delivery, Flink acts as the orchestrator on top of
> > > > > Kafka's transactions, deciding when to commit based on checkpoint
> > > > > completion. The problem arises when Flink disappears (there is no
> > > > > running Flink job anymore) and nobody is left to tell Kafka's
> > > > > coordinator to commit or abort. Kafka's coordinator in this case is
> > > > > behaving as designed - it holds the transaction open because it
> has no
> > > > > way to know whether the data should be committed or aborted. That
> > > > > decision belongs to Flink, which is no longer running.
> > > > >
> > > > > > Do we really need to think about transaction management tool
> > > specifically for kafka ?
> > > > >
> > > > >
> > > > > The Kafka sink is one of the most widely used Flink connectors, and
> > > > > exactly-once delivery with Kafka transactions is a primary
> production
> > > > > use case in some companies.
> > > > >
> > > > > --
> > > > > Kind regards,
> > > > > Aleksandr
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Kind regards,
> > > > Aleksandr
> > >
> > >
> > >
> > > --
> > > Kind regards,
> > > Aleksandr
> > >
>
>
>
> --
> Kind regards,
> Aleksandr
>

Reply via email to