[ https://issues.apache.org/jira/browse/KAFKA-14091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17696923#comment-17696923 ]
Sagar Rao edited comment on KAFKA-14091 at 3/6/23 3:36 PM: ----------------------------------------------------------- hi [~ChrisEgerton] , I started working on this. Had a clarifying question => The workaround to get around hanging transactions by having the leader proactively aborting open transactions makes sense to me. The problem is, the information whether a set of workers have departed the group during this round of rebalancing is available deep within `IncrementalCooperativeAssignor` class, specifically within `handleLostAssignments`. I am just wondering how do we close/abort the open transactions at this point though? I can think of zombie fencing the tasks of lost assignments if EOS support is enabled. IIUC, this should abort all open transactions. But for that, I would need to expose some state like Worker to the WorkerCoordinator (which gets passed to the ICA) and change some of the method signatures. I also see there's an `abortTransaction` method in `TransactionContext` but not sure how that can be leveraged. Or maybe am I totally missing the point in this case? was (Author: sagarrao): hi [~ChrisEgerton] , I started working on this. Had a clarifying question => The workaround to get around hanging transactions by having the leader proactively aborting open transactions makes sense to me. The problem is, the information whether a set of workers have departed the group during this round of rebalancing is available deep within `IncrementalCooperativeAssignor` class, specifically within `handleLostAssignments`. I am just wondering how do we close/abort the open transactions at this point though? I can think of zombie fencing the tasks of lost assignments if EOS support is enabled. But for that, I would need to expose some state like Worker to the WorkerCoordinator (which gets passed to the ICA). I also see there's an `abortTransaction` method in `TransactionContext` but not sure how that can be leveraged. Or maybe am I totally missing the point in this case? > Suddenly-killed tasks can leave hanging transactions open > --------------------------------------------------------- > > Key: KAFKA-14091 > URL: https://issues.apache.org/jira/browse/KAFKA-14091 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect > Reporter: Chris Egerton > Assignee: Sagar Rao > Priority: Major > > Right now, if a task running with exactly-once support is killed > ungracefully, it may leave a hanging transaction open. If the transaction > included writes to the global offsets topic, then startup for future workers > becomes blocked on that transaction expiring. > Ideally, we could identify these kinds of hanging transactions and > proactively abort them. > Unfortunately, there are a few facts that make this fairly complicated: > # Workers read to the end of the offsets topic during startup, before > joining the cluster > # Workers do not know which tasks they are assigned until they join the > cluster > The result of these facts is that we cannot trust workers that are restarted > shortly after being ungracefully shut down to fence out their own hanging > transactions, since any hanging transactions would prevent them from being > able to join the group and receive their task assignment in the first place. > We could possibly accomplish this by having the leader proactively abort any > open transactions for tasks on workers that appear to have left the cluster > during a rebalance. This would not require us to wait for the scheduled > rebalance delay to elapse, since the intent of the delay is to provide a > buffer between when workers leave and when their connectors/tasks are > reallocated across the cluster (and, if the worker is able to rejoin before > that buffer is consumed, then give it back the same connectors/tasks it was > running previously); aborting transactions for tasks on these workers would > not interfere with that goal. > > It's also possible that we may have to handle the case where a > [cancelled|https://github.com/apache/kafka/blob/badfbacdd09a9ee8821847f4b28d98625f354ed7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L274-L287] > task leaves a transaction open; I have yet to confirm whether this is > possible, though. -- This message was sent by Atlassian Jira (v8.20.10#820010)