[
https://issues.apache.org/jira/browse/KAFKA-20029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18048987#comment-18048987
]
Andrew Schofield commented on KAFKA-20029:
------------------------------------------
Similar problems exist for the other internal topics {{__consumer_offsets}} and
{{__share_group_state}}. I think this is a sensible issue to discuss, and
potentially a similar solution could be employed for all three of these topics.
> Disallow partition count increase for __transaction_state.
> ----------------------------------------------------------
>
> Key: KAFKA-20029
> URL: https://issues.apache.org/jira/browse/KAFKA-20029
> Project: Kafka
> Issue Type: Improvement
> Reporter: sanghyeok An
> Assignee: sanghyeok An
> Priority: Minor
>
> The routing logic for the Transaction Coordinator relies heavily on the
> partition count of the internal topic {{{}__transaction_state{}}}. The
> mapping is determined by
> {code:java}
> Utils.abs(transactionId.hashCode()) % transactionTopicPartitionCount. {code}
> Consequently, changing the number of partitions for {{__transaction_state}}
> (e.g., expanding from 50 to 150) changes the mapping logic. While Kafka
> documentation advises against modifying internal topics, there is currently
> no hard guardrail preventing this via the Admin API.
> IMHO, Allowing this operation can lead to a split-brain scenario during a
> rolling upgrade or cluster expansion, resulting in orphan transactions,
> hanging transactions.
>
> *Scenario & Analysis*
> Here is a breakdown of the race condition and inconsistency issues:
> # *Expansion:* An admin expands {{__transaction_state}} partitions from 50
> to 150. New partitions are created and leaders are elected.
> # *Metadata Inconsistency:* While the metadata cache updates, the
> {{transactionTopicPartitionCount}} used by {{partitionsFor}} logic might not
> update atomically across all nodes.
> # *Loading Failure:* A coordinator elected for a new partition might fail to
> load the transaction state in {{TransactionCoordinator#onElection}} due to a
> mismatch between the cached partition count and the actual partition count,
> throwing an exception.
> # *Split-Brain View:*
> ** *New Broker B* (starts up or receives update): Sees 150 partitions.
> ** {*}Old Broker A{*}: Still operates with the logic based on 50 partitions.
> # *Routing Divergence:*
> ** A client sends {{FindCoordinator}} for Transaction ID "A".
> ** *Broker B* calculates: {{hash("A") % 150}} -> Partition {*}130{*}.
> ** *Broker A* calculates: {{hash("A") % 50}} -> Partition {*}27{*}.
> # *non-deterministic behavior occur*
> ** If producer send a FindCoordinator to *Broker B* for Transaction ID "A",
> it receives partition {*}130{*}. However, it send a request to {*}Broker
> A{*}, it receive partition {*}27{*}.
> ** It means that producer will send {{InitProducerId}} to the leader of
> partition *130* or {*}27{*}. (anyway, the leader nodes of partition 27, 130
> will be included in old brokers)
> ** If TransactionCoordinator receive InitProducerID request for partition
> 130, it calls *partitionsFor(...)* to retrieve transaction state. but,
> TransactionCoordinator A is old broker and its
> *transactionTopicPartitionCount* is {*}50{*}. so result of re-calculating can
> be anything {*}(0 ~ 49){*}. It means that
> *** Errors.COORDINATOR_LOAD_IN_PROGRESS can occur.
> *** Transaction State will be write on unexpected __transaction_state
> partition. (for example, expect partition 130, but partition 11 – result of
> re-calculating, just assumption.)
> **** it cause orphan transaction. for example, when the broker roll out,
> they try to restore transaction state. they expect that transaction of "A"
> will be in partition 130, but actually it will be in partition 11. So, data
> topic may encounter hanging transaction problem.
> ** In addition to, since the leaders for Partition 130 and Partition 27 are
> likely different, we now have two coordinators potentially managing the same
> Transaction ID without knowledge of each other.
>
> *Impact*
> This non-deterministic behavior causes
> * *Orphan/Hanging Transactions:* The producer interacts with a new
> coordinator that has no history of the transaction.
> * *Availability Issues:* Clients may receive
> {{COORDINATOR_LOAD_IN_PROGRESS}} indefinitely if the coordinator fails to
> load the state due to count mismatch.
> * Potential worst-case correctness risk (not asserted as guaranteed):
> depending on the timing, the transition could increase the risk of unexpected
> fencing/coordination behavior.
>
>
> To ensure cluster stability and data integrity, how about enforcing a
> guardrail against modifying the partition count of __transaction_state?
> *1. Enforce validation in Controller/AdminManager*
> It would be good to reject {{AlterPartitions}} requests for internal topics
> by default.
> * Introduce a mechanism to check if the topic is internal during partition
> expansion.
> * If {{topic.isInternal()}} is true, return an {{InvalidRequestException}}
> with a clear error message stating that internal topic partition counts
> cannot be changed dynamically.
> * Introduce a safety configuration to controller side (e.g.,
> {{{}internal.topic.modification.enable{}}}, default {{{}false{}}}) for
> advanced users who strictly need to override this behavior, although it is
> strongly discouraged.
> *2. CLI Warning*
> Add a warning message or require a *{{--force}}* flag in the
> *{{kafka-topics.sh}}* CLI when attempting to modify internal topics. Note
> that this is a soft guardrail and does not prevent programmatic changes via
> the Admin API. (For example, the go, rust, python CLI...)
>
> Although the documentation explicitly advises against modifying the partition
> count of __transaction_state, the system currently permits it. This
> discrepancy creates a significant risk of critical human error. it would be
> good to enforce this safety constraint at the code level to ensure cluster
> stability.
> I would greatly appreciate kafka community's feedback on this issue.
> Thanks always.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)