Hi Aleksandr,
>  it is not enough to prevent lingering transactions in the scenarios
described in the FLIP,

This is why `transactionNamingStrategy = POOLING` is introduced which lists
pending transactions by
`org.apache.kafka.clients.admin.Admin#listTransactions`.

> when the Flink process terminates unexpectedly. E.g., when the JVM
crashes or is OS-OOM-killed, no shutdown hook runs and no cleanup occurs.
When the job restarts, the cleanup will be operated again.

By the way, Using the DataStream JAR approach is not convenient. I am
considering whether to use Flink CALL procedures[1] ; however, this
requires Kafka to support catalogs. Currently, due to the tight coupling
between Kafka's type mapping and its different format type(json , csv or
else), implementing catalog support is not feasible.

Best,
Hongshun

[1]
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/procedures



On Tue, Mar 31, 2026 at 8:33 PM Aleksandr Savonin <[email protected]>
wrote:

> Hi Hongshun Wang,
> thanks for your interest in this topic!
>
> > To mitigate this, I suggest embedding transaction abort logic directly
> into the sink's shutdown path
>
> Flink already has abort logic in the shutdown path, but it is not
> enough to prevent lingering transactions in the scenarios described in
> the FLIP, when the Flink process terminates unexpectedly. E.g., when
> the JVM crashes or is OS-OOM-killed, no shutdown hook runs and no
> cleanup occurs.
> On top of that, the shutdown path can only abort, not commit. The data
> may have been successfully checkpointed by Flink, but the checkpoint
> completion notification may not have been received yet, so the
> shutdown path cannot safely execute a commit. Aborting this
> transaction means losing data that Flink considers successfully
> delivered.
> The CLI tool's commit operation is the only way to recover this data.
> The tool is designed as a complement to Flink's built-in cleanup, not
> a replacement, it covers the gap that no in-process mechanism can
> fully close.
>
>
> > the 15-minute transaction timeout window makes it operationally fragile.
>
> You are probably referring to the broker's max.transaction.timeout.ms,
> which defaults to 15 minutes. Flink's default transaction.timeout.ms
> is 1 hour (see KafkaSinkBuilder.DEFAULT_KAFKA_TRANSACTION_TIMEOUT).
> The KafkaSink docs highly recommend tweaking the Kafka transaction
> timeout, otherwise data loss may happen. If data loss is unacceptable,
> operators may increase it further, making the window where downstream
> consumers are blocked even longer (hours or days), which strengthens
> the case for a tool that can resolve the situation immediately.
>
> On Tue, 31 Mar 2026 at 04:17, Hongshun Wang <[email protected]>
> wrote:
> >
> > Hi Aleksandr,
> >
> > > This scenario occurs when a job with exactly-once KafkaSink is
> > permanently stopped between a checkpoint and the subsequent commit.
> > The lingering transaction blocks downstream read_committed consumers
> > until the transaction timeout fires, and the transaction data is lost
> > upon abort.
> >
> > To mitigate this, I suggest embedding transaction abort logic directly
> into
> > the sink's shutdown path. While a separate cleanup job might seem like a
> > workaround, the 15-minute transaction timeout window makes it
> operationally
> > fragile. Integrating this natively would ensure deterministic cleanup
> > without relying on external triggers or manual intervention.
> >
> > Best,
> > Hongshun
> >
> > On Mon, Mar 30, 2026 at 8:13 PM Aleksandr Savonin <[email protected]>
> > wrote:
> >
> > > Hi Hongshun,
> > > Thank you for the feedback.
> > >
> > > > Heise's work under FLIP-511 has already addressed most client-side
> > > transaction issues.
> > >
> > > FLIP-511 improves Flink's built-in recovery. But Flink's native
> > > recovery only helps when the job restarts. This tool targets scenarios
> > > described in the FLIP's motivation - any scenario where Flink fails to
> > > commit or abort a transaction and Flink job is no longer running. In
> > > these cases there is no Flink runtime to execute the recovery logic,
> > > and operators currently have no supported tool.
> > >
> > >
> > > > Are you still hitting blockers in production?
> > >
> > > This scenario occurs when a job with exactly-once KafkaSink is
> > > permanently stopped between a checkpoint and the subsequent commit.
> > > The lingering transaction blocks downstream read_committed consumers
> > > until the transaction timeout fires, and the transaction data is lost
> > > upon abort.
> > >
> > >
> > > > The CLI logic seems to duplicate what already runs during Flink
> Writer
> > > restart/recovery. Could you clarify why a separate CLI is needed
> instead of
> > > relying on the native recovery path?
> > >
> > > Flink's committer does resume and commit transactions, but only within
> > > a running job. When the job is permanently gone, there is no Flink
> > > process to execute that logic. The CLI gives operators the ability to
> > > manually commit an abandoned transaction, preserving data that would
> > > otherwise be lost.
> > >
> > >
> > > > And I also don't think flink dist needs a cli about a connector's cli
> > > which is better to add in  flink kafka connector' repo.
> > >
> > > Agreed - this is already proposed as a module within
> > > flink-connector-kafka, packaged as a standalone uber-jar. It has no
> > > dependency on flink dist.
> > >
> > >
> > > > This cannot be fixed client-side and only a Kafka broker update
> (KIP-890
> > > [2]) will resolve it.
> > >
> > > Fully agreed, that is a different problem. This tool targets the
> > > complementary case where the transaction is properly tracked by Kafka
> > > but no Flink process exists to commit or abort it.
> > >
> > > Kind regards,
> > > Aleksandr Savonin
> > >
> > > On Mon, 30 Mar 2026 at 04:44, Hongshun Wang <[email protected]>
> > > wrote:
> > > >
> > > > Hi Aleksandr,
> > > >
> > > > Thanks for your hard work on this CLI and for proactively surfacing
> these
> > > > production scenarios. I really appreciate your contributions.
> > > >
> > > > Heise's work under FLIP-511 has already addressed most client-side
> > > > transaction issues. Are you still hitting blockers in production?
> > > >
> > > > The CLI logic seems to duplicate what already runs during Flink
> Writer
> > > > restart/recovery. Could you clarify why a separate CLI is needed
> instead
> > > of
> > > > relying on the native recovery path? And I also don't think flink
> dist
> > > > needs a cli about a connector's cli which is better to add in  flink
> > > kafka
> > > > connector' repo.
> > > >
> > > > One known gap remains: server-side transaction timeouts  without
> > > > ADD_PARTITIONS_TO_TXN , and server-side terminates the transaction.
> This
> > > > cannot be fixed client-side and only a Kafka broker update (KIP-890
> [2])
> > > > will resolve it.
> > > >
> > > >
> > > > [1]
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-511%3A+Support+transaction+id+pooling+in+Kafka+connector?src=contextnavpagetreemode
> > > > [2]
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense
> > > >
> > > > On Sat, Mar 28, 2026 at 8:12 PM Shekhar Rajak <
> [email protected]
> > > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > Curious to know if we debug and analyse the root cause and try to
> > > > > fix/enhance in the Kafka transaction coordinator side, will this
> issue
> > > be
> > > > > resolved ? Do we really need to think about transaction management
> tool
> > > > > specifically for kafka ?
> > > > > Regards,
> > > > > Shekhar Rajak,
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >     On Wednesday 25 March 2026 at 05:51:20 pm GMT+5:30, Aleksandr
> > > Savonin <
> > > > > [email protected]> wrote:
> > > > >
> > > > >  Hi everyone,
> > > > > I'd like to start a discussion on FLIP-572 [1].
> > > > > When a Flink job using exactly-once KafkaSink fails and does not
> > > recover,
> > > > > Kafka transactions can remain in the ONGOING state, blocking all
> > > downstream
> > > > > read_committed consumers at the Last Stable Offset until the broker
> > > timeout
> > > > > expires.
> > > > > There is currently no built-in tooling to resolve this, Kafka's own
> > > > > kafka-transactions.sh cannot commit Flink transactions since that
> > > requires
> > > > > Flink-specific internals.
> > > > > This FLIP proposes a standalone CLI tool, that allows operators to
> > > abort or
> > > > > commit lingering transactions without a running Flink cluster.
> > > > > Looking forward to your feedback.
> > > > >
> > > > > Kind regards,
> > > > > Aleksandr Savonin
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-572%3A+Introduce+Flink-Kafka+Transactions+Management+Tool
> > > > >
> > >
> > >
> > >
> > > --
> > > Kind regards,
> > > Aleksandr
> > >
>
>
>
> --
> Kind regards,
> Aleksandr
>

Reply via email to