Hi Aleksandr, > it is not enough to prevent lingering transactions in the scenarios described in the FLIP,
This is why `transactionNamingStrategy = POOLING` is introduced which lists pending transactions by `org.apache.kafka.clients.admin.Admin#listTransactions`. > when the Flink process terminates unexpectedly. E.g., when the JVM crashes or is OS-OOM-killed, no shutdown hook runs and no cleanup occurs. When the job restarts, the cleanup will be operated again. By the way, Using the DataStream JAR approach is not convenient. I am considering whether to use Flink CALL procedures[1] ; however, this requires Kafka to support catalogs. Currently, due to the tight coupling between Kafka's type mapping and its different format type(json , csv or else), implementing catalog support is not feasible. Best, Hongshun [1] https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/procedures On Tue, Mar 31, 2026 at 8:33 PM Aleksandr Savonin <[email protected]> wrote: > Hi Hongshun Wang, > thanks for your interest in this topic! > > > To mitigate this, I suggest embedding transaction abort logic directly > into the sink's shutdown path > > Flink already has abort logic in the shutdown path, but it is not > enough to prevent lingering transactions in the scenarios described in > the FLIP, when the Flink process terminates unexpectedly. E.g., when > the JVM crashes or is OS-OOM-killed, no shutdown hook runs and no > cleanup occurs. > On top of that, the shutdown path can only abort, not commit. The data > may have been successfully checkpointed by Flink, but the checkpoint > completion notification may not have been received yet, so the > shutdown path cannot safely execute a commit. Aborting this > transaction means losing data that Flink considers successfully > delivered. > The CLI tool's commit operation is the only way to recover this data. > The tool is designed as a complement to Flink's built-in cleanup, not > a replacement, it covers the gap that no in-process mechanism can > fully close. > > > > the 15-minute transaction timeout window makes it operationally fragile. > > You are probably referring to the broker's max.transaction.timeout.ms, > which defaults to 15 minutes. Flink's default transaction.timeout.ms > is 1 hour (see KafkaSinkBuilder.DEFAULT_KAFKA_TRANSACTION_TIMEOUT). > The KafkaSink docs highly recommend tweaking the Kafka transaction > timeout, otherwise data loss may happen. If data loss is unacceptable, > operators may increase it further, making the window where downstream > consumers are blocked even longer (hours or days), which strengthens > the case for a tool that can resolve the situation immediately. > > On Tue, 31 Mar 2026 at 04:17, Hongshun Wang <[email protected]> > wrote: > > > > Hi Aleksandr, > > > > > This scenario occurs when a job with exactly-once KafkaSink is > > permanently stopped between a checkpoint and the subsequent commit. > > The lingering transaction blocks downstream read_committed consumers > > until the transaction timeout fires, and the transaction data is lost > > upon abort. > > > > To mitigate this, I suggest embedding transaction abort logic directly > into > > the sink's shutdown path. While a separate cleanup job might seem like a > > workaround, the 15-minute transaction timeout window makes it > operationally > > fragile. Integrating this natively would ensure deterministic cleanup > > without relying on external triggers or manual intervention. > > > > Best, > > Hongshun > > > > On Mon, Mar 30, 2026 at 8:13 PM Aleksandr Savonin <[email protected]> > > wrote: > > > > > Hi Hongshun, > > > Thank you for the feedback. > > > > > > > Heise's work under FLIP-511 has already addressed most client-side > > > transaction issues. > > > > > > FLIP-511 improves Flink's built-in recovery. But Flink's native > > > recovery only helps when the job restarts. This tool targets scenarios > > > described in the FLIP's motivation - any scenario where Flink fails to > > > commit or abort a transaction and Flink job is no longer running. In > > > these cases there is no Flink runtime to execute the recovery logic, > > > and operators currently have no supported tool. > > > > > > > > > > Are you still hitting blockers in production? > > > > > > This scenario occurs when a job with exactly-once KafkaSink is > > > permanently stopped between a checkpoint and the subsequent commit. > > > The lingering transaction blocks downstream read_committed consumers > > > until the transaction timeout fires, and the transaction data is lost > > > upon abort. > > > > > > > > > > The CLI logic seems to duplicate what already runs during Flink > Writer > > > restart/recovery. Could you clarify why a separate CLI is needed > instead of > > > relying on the native recovery path? > > > > > > Flink's committer does resume and commit transactions, but only within > > > a running job. When the job is permanently gone, there is no Flink > > > process to execute that logic. The CLI gives operators the ability to > > > manually commit an abandoned transaction, preserving data that would > > > otherwise be lost. > > > > > > > > > > And I also don't think flink dist needs a cli about a connector's cli > > > which is better to add in flink kafka connector' repo. > > > > > > Agreed - this is already proposed as a module within > > > flink-connector-kafka, packaged as a standalone uber-jar. It has no > > > dependency on flink dist. > > > > > > > > > > This cannot be fixed client-side and only a Kafka broker update > (KIP-890 > > > [2]) will resolve it. > > > > > > Fully agreed, that is a different problem. This tool targets the > > > complementary case where the transaction is properly tracked by Kafka > > > but no Flink process exists to commit or abort it. > > > > > > Kind regards, > > > Aleksandr Savonin > > > > > > On Mon, 30 Mar 2026 at 04:44, Hongshun Wang <[email protected]> > > > wrote: > > > > > > > > Hi Aleksandr, > > > > > > > > Thanks for your hard work on this CLI and for proactively surfacing > these > > > > production scenarios. I really appreciate your contributions. > > > > > > > > Heise's work under FLIP-511 has already addressed most client-side > > > > transaction issues. Are you still hitting blockers in production? > > > > > > > > The CLI logic seems to duplicate what already runs during Flink > Writer > > > > restart/recovery. Could you clarify why a separate CLI is needed > instead > > > of > > > > relying on the native recovery path? And I also don't think flink > dist > > > > needs a cli about a connector's cli which is better to add in flink > > > kafka > > > > connector' repo. > > > > > > > > One known gap remains: server-side transaction timeouts without > > > > ADD_PARTITIONS_TO_TXN , and server-side terminates the transaction. > This > > > > cannot be fixed client-side and only a Kafka broker update (KIP-890 > [2]) > > > > will resolve it. > > > > > > > > > > > > [1] > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-511%3A+Support+transaction+id+pooling+in+Kafka+connector?src=contextnavpagetreemode > > > > [2] > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense > > > > > > > > On Sat, Mar 28, 2026 at 8:12 PM Shekhar Rajak < > [email protected] > > > > > > > > wrote: > > > > > > > > > Hi, > > > > > Curious to know if we debug and analyse the root cause and try to > > > > > fix/enhance in the Kafka transaction coordinator side, will this > issue > > > be > > > > > resolved ? Do we really need to think about transaction management > tool > > > > > specifically for kafka ? > > > > > Regards, > > > > > Shekhar Rajak, > > > > > > > > > > > > > > > > > > > > > > > > > On Wednesday 25 March 2026 at 05:51:20 pm GMT+5:30, Aleksandr > > > Savonin < > > > > > [email protected]> wrote: > > > > > > > > > > Hi everyone, > > > > > I'd like to start a discussion on FLIP-572 [1]. > > > > > When a Flink job using exactly-once KafkaSink fails and does not > > > recover, > > > > > Kafka transactions can remain in the ONGOING state, blocking all > > > downstream > > > > > read_committed consumers at the Last Stable Offset until the broker > > > timeout > > > > > expires. > > > > > There is currently no built-in tooling to resolve this, Kafka's own > > > > > kafka-transactions.sh cannot commit Flink transactions since that > > > requires > > > > > Flink-specific internals. > > > > > This FLIP proposes a standalone CLI tool, that allows operators to > > > abort or > > > > > commit lingering transactions without a running Flink cluster. > > > > > Looking forward to your feedback. > > > > > > > > > > Kind regards, > > > > > Aleksandr Savonin > > > > > > > > > > [1] > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-572%3A+Introduce+Flink-Kafka+Transactions+Management+Tool > > > > > > > > > > > > > > > > > -- > > > Kind regards, > > > Aleksandr > > > > > > > -- > Kind regards, > Aleksandr >
