[ 
https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17340576#comment-17340576
 ] 

Travis Bischel commented on KAFKA-12671:
----------------------------------------

After much investigating, a potential fix is to issue an AddPartitionsToTxn 
request with the producer ID, epoch, transactional ID that caused partitions to 
get stuck, as well as _all_ topics/partitions that are stuck, and then follow 
that with an EndTxn request (w/ pid, producer epoch, txn id).

Finding the pid, producer epoch, and transactional ID is the hard part.

> Out of order processing with a transactional producer can lead to a stuck 
> LastStableOffset
> ------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12671
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12671
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0
>            Reporter: Travis Bischel
>            Priority: Blocker
>              Labels: Transactions
>
> If there is pathological processing of incoming produce requests and EndTxn 
> requests, then the LastStableOffset can get stuck, which will block consuming 
> in READ_COMMITTED mode.
> To transactionally produce, the standard flow is to InitProducerId, and then 
> loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is 
> responsible for fencing and adding partitions to a transaction, and the end 
> transaction is responsible for finishing the transaction. Producing itself is 
> mostly uninvolved with the proper fencing / ending flow, but produce requests 
> are required to be after AddPartitionsToTxn and before EndTxn.
> When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager 
> to mildly manage transactions. The ProducerStateManager is completely 
> independent of the TxnCoordinator, and its guarantees are relatively weak. 
> The ProducerStateManager handles two types of "batches" being added: a data 
> batch and a transaction marker. When a data batch is added, a "transaction" 
> is begun and tied to the producer ID that is producing the batch. When a 
> transaction marker is handled, the ProducerStateManager removes the 
> transaction for the producer ID (roughly).
> EndTxn is what triggers transaction markers to be sent to the 
> ProducerStateManager. In essence, EndTxn is the one part of the transactional 
> producer flow that talks across both the TxnCoordinator and the 
> ProducerStateManager.
> If a ProduceRequest is issued before EndTxn, but handled internally in Kafka 
> after EndTxn, then the ProduceRequest will begin a new transaction in the 
> ProducerStateManager. If the client was disconnecting, and the EndTxn was the 
> final request issued, the new transaction created in ProducerStateManager is 
> orphaned and nothing can clean it up. The LastStableOffset then hangs based 
> off of this hung transaction.
> This same problem can be triggered by a produce request that is issued with a 
> transactional ID outside of the context of a transaction at all (no 
> AddPartitionsToTxn). This problem cannot be triggered by producing for so 
> long that the transaction expires; the difference here is that the 
> transaction coordinator bumps the epoch for the producer ID, thus producing 
> again with the old epoch does not work.
> Theoretically, we are supposed have unlimited retries on produce requests, 
> but in the context of wanting to abort everything and shut down, this is not 
> always feasible. As it currently stands, I'm not sure there's a truly safe 
> way to shut down _without_ flushing and receiving responses for every record 
> produced, even if I want to abort everything and quit. The safest approach I 
> can think of is to actually avoid issuing an EndTxn so that instead we rely 
> on Kafka internally to time things out after a period of time.
> —
> For some context, here's my request logs from the client. Note that I write 
> two ProduceRequests, read one, and then issue EndTxn (because I know I want 
> to quit). The second ProduceRequest is read successfully before shutdown, but 
> I ignore the results because I am shutting down. I've taken out logs related 
> to consuming, but the order of the logs is unchanged:
> {noformat}
> [INFO] done waiting for unknown topic, metadata was successful; topic: 
> 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765
> [INFO] initializing producer id
> [DEBUG] wrote FindCoordinator v3; err: <nil>
> [DEBUG] read FindCoordinator v3; err: <nil>
> [DEBUG] wrote InitProducerID v4; err: <nil>
> [DEBUG] read InitProducerID v4; err: <nil>
> [INFO] producer id initialization success; id: 1463, epoch: 0
> [DEBUG] wrote AddPartitionsToTxn v2; err: <nil>
> [DEBUG] read AddPartitionsToTxn v2; err: <nil>
> [DEBUG] wrote Produce v8; err: <nil>
> [DEBUG] read Produce v8; err: <nil>
> [DEBUG] produced; to: 
> 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}]
> [DEBUG] wrote Produce v8; err: <nil>
> [DEBUG] wrote EndTxn v2; err: <nil>
> [DEBUG] read EndTxn v2; err: <nil>
> [DEBUG] read from broker errored, killing connection; addr: localhost:9092, 
> id: 1, successful_reads: 1, err: context canceled
> [DEBUG] read Produce v8; err: <nil>
> [DEBUG] produced; to: 
> 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{skipped}]
> {noformat}
> And then from the broker's point of view. Across two brokers, the second 
> ProduceRequest is completed after EndTxn is handled (and after the 
> WriteTxnMarkers request is handled, which is the important one that hooks 
> into the ProducerStateManager):
> {noformat}
> /// Broker 3: init producer ID
> [2021-04-15 00:56:40,030] DEBUG Completed 
> request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=kgo, 
> correlationId=3) -- 
> {transactional_id=168e4dfe174060600305d8e998f08e1688bd7f48c7381cf979fff0e8a119f570,transaction_timeout_ms=60000,producer_id=-1,producer_epoch=-1,_tagged_fields={}},response:{throttle_time_ms=0,error_code=0,producer_id=1463,producer_epoch=0,_tagged_fields={}}
>  from connection 
> 127.0.0.1:9096-127.0.0.1:57450-1557;totalTime:2.255,requestQueueTime:0.077,localTime:0.74,remoteTime:0.095,throttleTime:0,responseQueueTime:1.005,sendTime:0.336,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo,
>  softwareVersion=0.1.0) (kafka.request.logger)
> /// Broker 3: add partitions to txn
> [2021-04-15 00:56:40,071] DEBUG Completed 
> request:RequestHeader(apiKey=ADD_PARTITIONS_TO_TXN, apiVersion=2, 
> clientId=kgo, correlationId=4) -- 
> {transactional_id=168e4dfe174060600305d8e998f08e1688bd7f48c7381cf979fff0e8a119f570,producer_id=1463,producer_epoch=0,topics=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partitions=[1]}]},response:{throttle_time_ms=0,results=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,results=[{partition_index=1,error_code=0}]}]}
>  from connection 
> 127.0.0.1:9096-127.0.0.1:57450-1557;totalTime:1.247,requestQueueTime:0.133,localTime:0.71,remoteTime:0.136,throttleTime:0,responseQueueTime:0.087,sendTime:0.178,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo,
>  softwareVersion=0.1.0) (kafka.request.logger)
> /// Broker 2: first produce
> [2021-04-15 00:56:40,223] DEBUG Completed 
> request:RequestHeader(apiKey=PRODUCE, apiVersion=8, clientId=kgo, 
> correlationId=1) -- 
> {acks=-1,timeout=30000,numPartitions=1},response:{responses=[{topic=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partition_responses=[{partition=1,error_code=0,base_offset=15589,log_append_time=-1,log_start_offset=0,record_errors=[],error_message=null}]}],throttle_time_ms=0}
>  from connection 
> 127.0.0.1:9094-127.0.0.1:59022-1639;totalTime:2.705,requestQueueTime:0.055,localTime:2.435,remoteTime:0.058,throttleTime:0,responseQueueTime:0.055,sendTime:0.1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo,
>  softwareVersion=0.1.0),temporaryMemoryBytes:324898 (kafka.request.logger)
> // Broker 3: end txn
> [2021-04-15 00:56:40,350] DEBUG Completed 
> request:RequestHeader(apiKey=END_TXN, apiVersion=2, clientId=kgo, 
> correlationId=5) -- 
> {transactional_id=168e4dfe174060600305d8e998f08e1688bd7f48c7381cf979fff0e8a119f570,producer_id=1463,producer_epoch=0,committed=false},response:{throttle_time_ms=0,error_code=0}
>  from connection 
> 127.0.0.1:9096-127.0.0.1:57450-1557;totalTime:3.484,requestQueueTime:0.052,localTime:0.318,remoteTime:0.06,throttleTime:0,responseQueueTime:2.92,sendTime:0.133,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo,
>  softwareVersion=0.1.0) (kafka.request.logger)
> /// Broker 2: txn markers
> [2021-04-15 00:56:40,357] DEBUG Completed 
> request:RequestHeader(apiKey=WRITE_TXN_MARKERS, apiVersion=0, 
> clientId=broker-3-txn-marker-sender, correlationId=66708) -- 
> {markers=[{producer_id=1463,producer_epoch=0,transaction_result=false,topics=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partition_indexes=[1]}],coordinator_epoch=0}]},response:{markers=[{producer_id=1463,topics=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partitions=[{partition_index=1,error_code=0}]}]}]}
>  from connection 
> 127.0.0.1:9094-127.0.0.1:38966-676;totalTime:3.507,requestQueueTime:1.957,localTime:0.34,remoteTime:0.031,throttleTime:0,responseQueueTime:0.324,sendTime:0.853,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=unknown,
>  softwareVersion=unknown) (kafka.request.logger)
> /// Broker 2: second produce
> [2021-04-15 00:56:40,374] DEBUG Completed 
> request:RequestHeader(apiKey=PRODUCE, apiVersion=8, clientId=kgo, 
> correlationId=2) -- 
> {acks=-1,timeout=30000,numPartitions=1},response:{responses=[{topic=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partition_responses=[{partition=1,error_code=0,base_offset=19687,log_append_time=-1,log_start_offset=0,record_errors=[],error_message=null}]}],throttle_time_ms=0}
>  from connection 
> 127.0.0.1:9094-127.0.0.1:59022-1639;totalTime:4.45,requestQueueTime:0.603,localTime:2.721,remoteTime:0.051,throttleTime:0,responseQueueTime:0.043,sendTime:1.031,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo,
>  softwareVersion=0.1.0),temporaryMemoryBytes:356824 (kafka.request.logger)
> {noformat}
> —
> I believe that one fix for this would be to only allow transactions to start 
> in the ProducerStateManager if a transaction has actually begun through 
> AddPartitionsToTxn, and to reject produce requests to partitions that have 
> not been added to a txn. An alternative fix would be to just wait for all 
> produce requests to finish before issuing EndTxn, but this seems less 
> desirable when wanting to shut down and abort progress. Another alternative 
> is to avoid issuing EndTxn and to just shutdown, but this also seems 
> undesirable and will block consumers until the transaction timeout expires.
> This may be the cause of KAFKA-5880.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to