[ 
https://issues.apache.org/jira/browse/KAFKA-20029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18048987#comment-18048987
 ] 

Andrew Schofield commented on KAFKA-20029:
------------------------------------------

Similar problems exist for the other internal topics {{__consumer_offsets}} and 
{{__share_group_state}}. I think this is a sensible issue to discuss, and 
potentially a similar solution could be employed for all three of these topics.

> Disallow partition count increase for __transaction_state.
> ----------------------------------------------------------
>
>                 Key: KAFKA-20029
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20029
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: sanghyeok An
>            Assignee: sanghyeok An
>            Priority: Minor
>
> The routing logic for the Transaction Coordinator relies heavily on the 
> partition count of the internal topic {{{}__transaction_state{}}}. The 
> mapping is determined by
> {code:java}
> Utils.abs(transactionId.hashCode()) % transactionTopicPartitionCount. {code}
> Consequently, changing the number of partitions for {{__transaction_state}} 
> (e.g., expanding from 50 to 150) changes the mapping logic. While Kafka 
> documentation advises against modifying internal topics, there is currently 
> no hard guardrail preventing this via the Admin API.
> IMHO, Allowing this operation can lead to a split-brain scenario during a 
> rolling upgrade or cluster expansion, resulting in orphan transactions, 
> hanging transactions.
>  
> *Scenario & Analysis*
> Here is a breakdown of the race condition and inconsistency issues:
>  # *Expansion:* An admin expands {{__transaction_state}} partitions from 50 
> to 150. New partitions are created and leaders are elected.
>  # *Metadata Inconsistency:* While the metadata cache updates, the 
> {{transactionTopicPartitionCount}} used by {{partitionsFor}} logic might not 
> update atomically across all nodes.
>  # *Loading Failure:* A coordinator elected for a new partition might fail to 
> load the transaction state in {{TransactionCoordinator#onElection}} due to a 
> mismatch between the cached partition count and the actual partition count, 
> throwing an exception.
>  # *Split-Brain View:*
>  ** *New Broker B* (starts up or receives update): Sees 150 partitions.
>  ** {*}Old Broker A{*}: Still operates with the logic based on 50 partitions.
>  # *Routing Divergence:*
>  ** A client sends {{FindCoordinator}} for Transaction ID "A".
>  ** *Broker B* calculates: {{hash("A") % 150}} -> Partition {*}130{*}.
>  ** *Broker A* calculates: {{hash("A") % 50}} -> Partition {*}27{*}.
>  # *non-deterministic behavior occur*
>  ** If producer send a FindCoordinator to *Broker B* for Transaction ID "A", 
> it receives partition {*}130{*}. However, it send a request to {*}Broker 
> A{*}, it receive partition {*}27{*}.
>  ** It means that producer will send {{InitProducerId}} to the leader of 
> partition *130* or {*}27{*}. (anyway, the leader nodes of partition 27, 130 
> will be included in old brokers)
>  ** If TransactionCoordinator receive InitProducerID request for partition 
> 130, it calls *partitionsFor(...)* to retrieve transaction state. but, 
> TransactionCoordinator A is old broker and its 
> *transactionTopicPartitionCount* is {*}50{*}. so result of re-calculating can 
> be anything {*}(0 ~ 49){*}. It means that
>  *** Errors.COORDINATOR_LOAD_IN_PROGRESS can occur.
>  *** Transaction State will be write on unexpected __transaction_state 
> partition. (for example, expect partition 130, but partition 11 – result of 
> re-calculating, just assumption.)
>  **** it cause orphan transaction. for example, when the broker roll out, 
> they try to restore transaction state. they expect that transaction of "A" 
> will be in partition 130, but actually it will be in partition 11. So, data 
> topic may encounter hanging transaction problem. 
>  ** In addition to, since the leaders for Partition 130 and Partition 27 are 
> likely different, we now have two coordinators potentially managing the same 
> Transaction ID without knowledge of each other.
>  
> *Impact*
> This non-deterministic behavior causes
>  * *Orphan/Hanging Transactions:* The producer interacts with a new 
> coordinator that has no history of the transaction.
>  * *Availability Issues:* Clients may receive 
> {{COORDINATOR_LOAD_IN_PROGRESS}} indefinitely if the coordinator fails to 
> load the state due to count mismatch.
>  * Potential worst-case correctness risk (not asserted as guaranteed): 
> depending on the timing, the transition could increase the risk of unexpected 
> fencing/coordination behavior.
>  
>  
> To ensure cluster stability and data integrity, how about enforcing a 
> guardrail against modifying the partition count of __transaction_state?
> *1. Enforce validation in Controller/AdminManager*
> It would be good to reject {{AlterPartitions}} requests for internal topics 
> by default.
>  * Introduce a mechanism to check if the topic is internal during partition 
> expansion.
>  * If {{topic.isInternal()}} is true, return an {{InvalidRequestException}} 
> with a clear error message stating that internal topic partition counts 
> cannot be changed dynamically.
>  * Introduce a safety configuration to controller side (e.g., 
> {{{}internal.topic.modification.enable{}}}, default {{{}false{}}}) for 
> advanced users who strictly need to override this behavior, although it is 
> strongly discouraged.
> *2. CLI Warning*
> Add a warning message or require a *{{--force}}* flag in the 
> *{{kafka-topics.sh}}* CLI when attempting to modify internal topics. Note 
> that this is a soft guardrail and does not prevent programmatic changes via 
> the Admin API. (For example, the go, rust, python CLI...)
>  
> Although the documentation explicitly advises against modifying the partition 
> count of __transaction_state, the system currently permits it. This 
> discrepancy creates a significant risk of critical human error. it would be 
> good to enforce this safety constraint at the code level to ensure cluster 
> stability.
> I would greatly appreciate kafka community's feedback on this issue.
> Thanks always. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to