Hi Ryan,
Thank you for the questions.

Answering each in turn:

> Is there a reason we are excluding this from the first version?

Iterative delivery. The core capability (abort/commit a known
transactional ID) is well-defined, has a working draft, and addresses
a problem that operators are facing today, there is currently no way
to recover data from a lingering transaction when the Flink job is
gone. The v1 solution is needed now. State Processor API integration
adds a separate set of concerns and would extend the review time. Once
the foundation is in, the integration can be added in a follow-up
without changing the v1 interface.

> How will the bundling work if this module is in flink-connector-kafka? Won't 
> it rely on parts of core flink? Are we adding the state-processor-api as a 
> dependency to this module?

V1 ships as a self-contained uber-jar built with maven-shade-plugin in
the flink-connector-kafka-transaction-tool module. It bundles
kafka-clients, FlinkKafkaInternalProducer, commons-cli, and
slf4j-simple. It does not depend on flink-state-processor-api,
flink-runtime, or flink-streaming-java.
For the future State Processor API integration,
flink-state-processor-api would be required, which has a larger
footprint. To keep the basic tool lightweight, the integration could
be packaged as a separate sub-module or an additional uber-jar variant
(e.g. `-with-state-processor`).
Operators who only need to abort/commit avoid the larger dependency
and those who need savepoint-aware automation opt into it explicitly.

On Mon, 4 May 2026 at 19:52, Ryan van Huuksloot via dev
<[email protected]> wrote:
>
> Sounds good. The important aspect is working with savepoints - makes sense.
> I have a few follow-up questions:
>
> Is there a reason we are excluding this from the first version?
> How will the bundling work if this module is in flink-connector-kafka?
> Won't it rely on parts of core flink? Are we adding the state-processor-api
> as a dependency to this module?
>
> Looking at the FLIP, I empathize with the desire and I think it is a great
> addition to the overall streaming community.
>
> Ryan van Huuksloot
> Staff Engineer, Infrastructure | Streaming Platform
> [image: Shopify]
> <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>
>
>
> On Mon, May 4, 2026 at 12:48 PM Aleksandr Savonin <[email protected]>
> wrote:
>
> > Hi Ryan,
> > Thanks for the questions and arguments.
> >
> > The problem this FLIP addresses is concrete: when a Flink job is no
> > longer running but checkpointed data exists in an uncommitted Kafka
> > transaction, there is currently no way to commit that data. The
> > transaction will eventually be aborted by the broker, and the data is
> > lost. We need a solution for this issue.
> >
> > The FLIP's Future Work section proposes State Processor API
> > integration to automatically extract `producerId` and `epoch` from
> > Flink checkpoint/savepoint metadata. This would let operators run the
> > tool without manual data retrieval. Such functionality belongs in
> > Flink -> it requires reading Flink-specific state and could not live
> > in Kafka's repo.
> >
> > On maintenance burden - the proposed module is very small and
> > isolated. The working draft
> > (https://github.com/apache/flink-connector-kafka/pull/238) reuses
> > existing `FlinkKafkaInternalProducer` logic that the connector already
> > maintains for its primary commit path, the new code is essentially a
> > CLI wrapper around it.
> >
> > So while the abort case could in principle be handled by Kafka
> > tooling, the commit case and its natural evolution (e.g. toward
> > checkpoint-aware automation) is tied to Flink's state model. That is
> > one of the arguments why this belongs in the Flink ecosystem.
> >
> >
> > Kind regards,
> >
> > Aleksandr Savonin
> >
> > On Mon, 4 May 2026 at 16:34, Ryan van Huuksloot via dev
> > <[email protected]> wrote:
> > >
> > > Hello,
> > >
> > > Sorry for the late reply.
> > >
> > > Reading this FLIP, I don't clearly understand why this belongs in the
> > Flink
> > > ecosystem. Nothing seems unique to Flink other than its implementation of
> > > Exactly-Once semantics. We aren't the only Exactly-Once producer.
> > >
> > > I read the following from the mailing list.
> > >
> > > >For exactly-once delivery, Flink acts as the orchestrator on top of
> > > Kafka's transactions, deciding when to commit based on checkpoint
> > > completion. The problem arises when Flink disappears (there is no
> > > running Flink job anymore) and nobody is left to tell Kafka's
> > > coordinator to commit or abort. Kafka's coordinator in this case is
> > > behaving as designed - it holds the transaction open because it has no
> > > way to know whether the data should be committed or aborted. That
> > > decision belongs to Flink, which is no longer running.
> > >
> > > The decision belonged* to Flink. However once we start discussing using a
> > > CLI to complete transactions, the decision no longer belongs to Flink.
> > The
> > > user employing that CLI should decide what to do with that transaction.
> > > That boundary no longer requires Flink.
> > >
> > > I would push back pretty hard on adding this maintenance burden to this
> > > community. It seems to me that Kafka has an API limitation that we are
> > > ignoring, pushing this responsibility onto Flink.
> > >
> > > Before we add this to Flink, can we discuss more why it belongs in this
> > > community? I see in your FLIP the `Use Kafka's built-in
> > > kafka-transactions.sh tool`, which is fine but we don't discuss what
> > could
> > > be added or changed there.
> > >
> > > Ryan van Huuksloot
> > > Staff Engineer, Infrastructure | Streaming Platform
> > > [image: Shopify]
> > > <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email
> > >
> > >
> > >
> > > On Fri, Apr 24, 2026 at 7:44 AM Aleksandr Savonin <[email protected]>
> > > wrote:
> > >
> > > > Hi Shekhar and Hongshun,
> > > > Thank you for the questions and the participation.
> > > > Could you please clarify if you have any further questions? If not, I
> > > > will start a voting thread.
> > > >
> > > > On Fri, 10 Apr 2026 at 14:07, Aleksandr Savonin <[email protected]>
> > > > wrote:
> > > > >
> > > > > Hi Shekhar Prasad Rajak,
> > > > >
> > > > > > Don't we have timeout, session timeout at kafka side ?
> > > > >
> > > > > Yes, Kafka does have a timeout and when the timeout fires, the
> > > > > transaction is aborted by the broker. However, if Flink successfully
> > > > > completed a checkpoint but didn't commit the transaction (e.g. the
> > JVM
> > > > > crashed before committing the transaction to Kafka), the data should
> > > > > be committed to satisfy 2PC protocol promises. Relying on the timeout
> > > > > may belong to data loss in this scenario.
> > > > > Moreover, some Flink-Kafka setups use high values for Kafka
> > > > > transaction timeouts, in that case, even transactions that must be
> > > > > aborted (because they were not checkpointed), will not be aborted
> > > > > until timeout fires, blocking the LSO. The tool provides a way to
> > > > > resolve this immediately.
> > > > >
> > > > > Kind regards,
> > > > > Aleksandr
> > > > >
> > > > > On Wed, 8 Apr 2026 at 05:34, Shekhar Rajak <
> > [email protected]>
> > > > wrote:
> > > > > >
> > > > > > Thanks for clarifying but
> > > > > > >  it holds the transaction open because it has noway to know
> > whether
> > > > the data should be committed or aborted. That
> > > > > > decision belongs to Flink, which is no longer running.
> > > > > >
> > > > > > Don't we have timeout, session timeout at kafka side ? After
> > enabling
> > > > 2PC we must be using those config to make sure Kafka broker releases
> > the
> > > > locks and have durability.
> > > > > >
> > > > > > Regards,
> > > > > > Shekhar Prasad Rajak
> > > > > >
> > > > > >
> > > > > >     On Monday 30 March 2026 at 04:34:07 pm GMT+5:30, Aleksandr
> > Savonin
> > > > <[email protected]> wrote:
> > > > > >
> > > > > >  Hi Shekhar Rajak,
> > > > > > Good questions.
> > > > > > Let me clarify the transaction coordination model and why a
> > > > > > Flink-specific tool is needed.
> > > > > >
> > > > > > > Curious to know if we debug and analyse the root cause and try to
> > > > fix/enhance in the Kafka transaction coordinator side, will this issue
> > be
> > > > resolved ?
> > > > > >
> > > > > >
> > > > > > Kafka has a built-in Transaction Coordinator, but it only manages
> > the
> > > > > > Kafka protocol layer, it executes commit/abort when told to.
> > > > > > For exactly-once delivery, Flink acts as the orchestrator on top of
> > > > > > Kafka's transactions, deciding when to commit based on checkpoint
> > > > > > completion. The problem arises when Flink disappears (there is no
> > > > > > running Flink job anymore) and nobody is left to tell Kafka's
> > > > > > coordinator to commit or abort. Kafka's coordinator in this case is
> > > > > > behaving as designed - it holds the transaction open because it
> > has no
> > > > > > way to know whether the data should be committed or aborted. That
> > > > > > decision belongs to Flink, which is no longer running.
> > > > > >
> > > > > > > Do we really need to think about transaction management tool
> > > > specifically for kafka ?
> > > > > >
> > > > > >
> > > > > > The Kafka sink is one of the most widely used Flink connectors, and
> > > > > > exactly-once delivery with Kafka transactions is a primary
> > production
> > > > > > use case in some companies.
> > > > > >
> > > > > > --
> > > > > > Kind regards,
> > > > > > Aleksandr
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Kind regards,
> > > > > Aleksandr
> > > >
> > > >
> > > >
> > > > --
> > > > Kind regards,
> > > > Aleksandr
> > > >
> >
> >
> >
> > --
> > Kind regards,
> > Aleksandr
> >



-- 
Kind regards,
Aleksandr

Reply via email to