[ https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Travis Bischel updated KAFKA-12671: ----------------------------------- Priority: Major (was: Blocker) > Out of order processing with a transactional producer can lead to a stuck > LastStableOffset > ------------------------------------------------------------------------------------------ > > Key: KAFKA-12671 > URL: https://issues.apache.org/jira/browse/KAFKA-12671 > Project: Kafka > Issue Type: Bug > Affects Versions: 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0 > Reporter: Travis Bischel > Priority: Major > Labels: Transactions > > If there is pathological processing of incoming produce requests and EndTxn > requests, then the LastStableOffset can get stuck, which will block consuming > in READ_COMMITTED mode. > To transactionally produce, the standard flow is to InitProducerId, and then > loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is > responsible for fencing and adding partitions to a transaction, and the end > transaction is responsible for finishing the transaction. Producing itself is > mostly uninvolved with the proper fencing / ending flow, but produce requests > are required to be after AddPartitionsToTxn and before EndTxn. > When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager > to mildly manage transactions. The ProducerStateManager is completely > independent of the TxnCoordinator, and its guarantees are relatively weak. > The ProducerStateManager handles two types of "batches" being added: a data > batch and a transaction marker. When a data batch is added, a "transaction" > is begun and tied to the producer ID that is producing the batch. When a > transaction marker is handled, the ProducerStateManager removes the > transaction for the producer ID (roughly). > EndTxn is what triggers transaction markers to be sent to the > ProducerStateManager. In essence, EndTxn is the one part of the transactional > producer flow that talks across both the TxnCoordinator and the > ProducerStateManager. > If a ProduceRequest is issued before EndTxn, but handled internally in Kafka > after EndTxn, then the ProduceRequest will begin a new transaction in the > ProducerStateManager. If the client was disconnecting, and the EndTxn was the > final request issued, the new transaction created in ProducerStateManager is > orphaned and nothing can clean it up. The LastStableOffset then hangs based > off of this hung transaction. > This same problem can be triggered by a produce request that is issued with a > transactional ID outside of the context of a transaction at all (no > AddPartitionsToTxn). This problem cannot be triggered by producing for so > long that the transaction expires; the difference here is that the > transaction coordinator bumps the epoch for the producer ID, thus producing > again with the old epoch does not work. > Theoretically, we are supposed have unlimited retries on produce requests, > but in the context of wanting to abort everything and shut down, this is not > always feasible. As it currently stands, I'm not sure there's a truly safe > way to shut down _without_ flushing and receiving responses for every record > produced, even if I want to abort everything and quit. The safest approach I > can think of is to actually avoid issuing an EndTxn so that instead we rely > on Kafka internally to time things out after a period of time. > — > For some context, here's my request logs from the client. Note that I write > two ProduceRequests, read one, and then issue EndTxn (because I know I want > to quit). The second ProduceRequest is read successfully before shutdown, but > I ignore the results because I am shutting down. I've taken out logs related > to consuming, but the order of the logs is unchanged: > {noformat} > [INFO] done waiting for unknown topic, metadata was successful; topic: > 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765 > [INFO] initializing producer id > [DEBUG] wrote FindCoordinator v3; err: <nil> > [DEBUG] read FindCoordinator v3; err: <nil> > [DEBUG] wrote InitProducerID v4; err: <nil> > [DEBUG] read InitProducerID v4; err: <nil> > [INFO] producer id initialization success; id: 1463, epoch: 0 > [DEBUG] wrote AddPartitionsToTxn v2; err: <nil> > [DEBUG] read AddPartitionsToTxn v2; err: <nil> > [DEBUG] wrote Produce v8; err: <nil> > [DEBUG] read Produce v8; err: <nil> > [DEBUG] produced; to: > 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}] > [DEBUG] wrote Produce v8; err: <nil> > [DEBUG] wrote EndTxn v2; err: <nil> > [DEBUG] read EndTxn v2; err: <nil> > [DEBUG] read from broker errored, killing connection; addr: localhost:9092, > id: 1, successful_reads: 1, err: context canceled > [DEBUG] read Produce v8; err: <nil> > [DEBUG] produced; to: > 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{skipped}] > {noformat} > And then from the broker's point of view. Across two brokers, the second > ProduceRequest is completed after EndTxn is handled (and after the > WriteTxnMarkers request is handled, which is the important one that hooks > into the ProducerStateManager): > {noformat} > /// Broker 3: init producer ID > [2021-04-15 00:56:40,030] DEBUG Completed > request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=kgo, > correlationId=3) -- > {transactional_id=168e4dfe174060600305d8e998f08e1688bd7f48c7381cf979fff0e8a119f570,transaction_timeout_ms=60000,producer_id=-1,producer_epoch=-1,_tagged_fields={}},response:{throttle_time_ms=0,error_code=0,producer_id=1463,producer_epoch=0,_tagged_fields={}} > from connection > 127.0.0.1:9096-127.0.0.1:57450-1557;totalTime:2.255,requestQueueTime:0.077,localTime:0.74,remoteTime:0.095,throttleTime:0,responseQueueTime:1.005,sendTime:0.336,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo, > softwareVersion=0.1.0) (kafka.request.logger) > /// Broker 3: add partitions to txn > [2021-04-15 00:56:40,071] DEBUG Completed > request:RequestHeader(apiKey=ADD_PARTITIONS_TO_TXN, apiVersion=2, > clientId=kgo, correlationId=4) -- > {transactional_id=168e4dfe174060600305d8e998f08e1688bd7f48c7381cf979fff0e8a119f570,producer_id=1463,producer_epoch=0,topics=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partitions=[1]}]},response:{throttle_time_ms=0,results=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,results=[{partition_index=1,error_code=0}]}]} > from connection > 127.0.0.1:9096-127.0.0.1:57450-1557;totalTime:1.247,requestQueueTime:0.133,localTime:0.71,remoteTime:0.136,throttleTime:0,responseQueueTime:0.087,sendTime:0.178,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo, > softwareVersion=0.1.0) (kafka.request.logger) > /// Broker 2: first produce > [2021-04-15 00:56:40,223] DEBUG Completed > request:RequestHeader(apiKey=PRODUCE, apiVersion=8, clientId=kgo, > correlationId=1) -- > {acks=-1,timeout=30000,numPartitions=1},response:{responses=[{topic=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partition_responses=[{partition=1,error_code=0,base_offset=15589,log_append_time=-1,log_start_offset=0,record_errors=[],error_message=null}]}],throttle_time_ms=0} > from connection > 127.0.0.1:9094-127.0.0.1:59022-1639;totalTime:2.705,requestQueueTime:0.055,localTime:2.435,remoteTime:0.058,throttleTime:0,responseQueueTime:0.055,sendTime:0.1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo, > softwareVersion=0.1.0),temporaryMemoryBytes:324898 (kafka.request.logger) > // Broker 3: end txn > [2021-04-15 00:56:40,350] DEBUG Completed > request:RequestHeader(apiKey=END_TXN, apiVersion=2, clientId=kgo, > correlationId=5) -- > {transactional_id=168e4dfe174060600305d8e998f08e1688bd7f48c7381cf979fff0e8a119f570,producer_id=1463,producer_epoch=0,committed=false},response:{throttle_time_ms=0,error_code=0} > from connection > 127.0.0.1:9096-127.0.0.1:57450-1557;totalTime:3.484,requestQueueTime:0.052,localTime:0.318,remoteTime:0.06,throttleTime:0,responseQueueTime:2.92,sendTime:0.133,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo, > softwareVersion=0.1.0) (kafka.request.logger) > /// Broker 2: txn markers > [2021-04-15 00:56:40,357] DEBUG Completed > request:RequestHeader(apiKey=WRITE_TXN_MARKERS, apiVersion=0, > clientId=broker-3-txn-marker-sender, correlationId=66708) -- > {markers=[{producer_id=1463,producer_epoch=0,transaction_result=false,topics=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partition_indexes=[1]}],coordinator_epoch=0}]},response:{markers=[{producer_id=1463,topics=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partitions=[{partition_index=1,error_code=0}]}]}]} > from connection > 127.0.0.1:9094-127.0.0.1:38966-676;totalTime:3.507,requestQueueTime:1.957,localTime:0.34,remoteTime:0.031,throttleTime:0,responseQueueTime:0.324,sendTime:0.853,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=unknown, > softwareVersion=unknown) (kafka.request.logger) > /// Broker 2: second produce > [2021-04-15 00:56:40,374] DEBUG Completed > request:RequestHeader(apiKey=PRODUCE, apiVersion=8, clientId=kgo, > correlationId=2) -- > {acks=-1,timeout=30000,numPartitions=1},response:{responses=[{topic=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partition_responses=[{partition=1,error_code=0,base_offset=19687,log_append_time=-1,log_start_offset=0,record_errors=[],error_message=null}]}],throttle_time_ms=0} > from connection > 127.0.0.1:9094-127.0.0.1:59022-1639;totalTime:4.45,requestQueueTime:0.603,localTime:2.721,remoteTime:0.051,throttleTime:0,responseQueueTime:0.043,sendTime:1.031,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo, > softwareVersion=0.1.0),temporaryMemoryBytes:356824 (kafka.request.logger) > {noformat} > — > I believe that one fix for this would be to only allow transactions to start > in the ProducerStateManager if a transaction has actually begun through > AddPartitionsToTxn, and to reject produce requests to partitions that have > not been added to a txn. An alternative fix would be to just wait for all > produce requests to finish before issuing EndTxn, but this seems less > desirable when wanting to shut down and abort progress. Another alternative > is to avoid issuing EndTxn and to just shutdown, but this also seems > undesirable and will block consumers until the transaction timeout expires. > This may be the cause of KAFKA-5880. -- This message was sent by Atlassian Jira (v8.3.4#803005)