Timeout in offset commit request has been fixed recently. This issue seems to be more of KAFKA-8334 <https://issues.apache.org/jira/browse/KAFKA-8334> .
On Mon, Jan 4, 2021 at 3:53 PM Kindernay Oliver <oliver.kinder...@eurowag.com.invalid> wrote: > Hello, > > we are experiencing problems with offset commit timing out on brokers. > This started to happen when we started using transactions. > send_offsets_to_transactions periodically (every few minutes) times out. We > have cluster of three brokers, and topics with 15 partitions. We use one > transactional producer per partition and commit the transaction at least > each second or more. The time out seemingly happens randomly, each time for > a different producer instance and different broker. > > > 1. We used to get REQUEST_TIMED_OUT after 5 seconds. I understand that > error came from a broker. > 2. We tried to raise offsets.commit.timeout.ms on broker to 60 seconds > 3. After the change, we are getting transaction operations timeout > after 60s, with same periodicity. This is now a client error since the > kafka broker takes the full minute, after which we would probably see the > same error message from broker as previsously. > * <class > 'cimpl.KafkaException'>/KafkaError{code=_TIMED_OUT,val=-185,str="Transactional > operation timed out"} > * %5|1609747761.211|REQTMOUT|rdkafka#producer-10| [thrd: > 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Timed out > TxnOffsetCommitRequest in flight (after 59943ms, timeout #0) > %4|1609747761.211|REQTMOUT|rdkafka#producer-10| [thrd: > 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Timed out 1 > in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests > %7|1609747761.212|FAIL|rdkafka#producer-10| [thrd: > 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: 1 request(s) timed > out: disconnect (after 1544886ms in state UP) (_TIMED_OUT) > %3|1609747761.212|FAIL|rdkafka#producer-10| [thrd: > 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: 1 request(s) timed > out: disconnect (after 1544886ms in state UP) > %7|1609747761.212|STATE|rdkafka#producer-10| [thrd: > 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Broker changed state > UP -> DOWN > %7|1609747761.212|STATE|rdkafka#producer-10| [thrd: > 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Broker changed state > DOWN -> INIT > %7|1609747761.212|REQERR|rdkafka#producer-10| [thrd:main]: > 10.48.111.102:9092/2: MetadataRequest failed: Local: Timed out: actions > Retry > %7|1609747761.312|STATE|rdkafka#producer-10| [thrd: > 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Broker changed state > INIT -> TRY_CONNECT > %7|1609747761.312|RETRY|rdkafka#producer-10| [thrd: > 10.48.111.102:9092/bootstrap]: 10.48.111.102:9092/2: Moved 1 retry > buffer(s) to output queue > > > > 1. Broker system metrics (disk, network, CPU, memory) did not indicate > any bottleneck, Anyway, we tried to upgrade the broker VMs to larger size, > with no change to the behaviour. > 2. This happens on our test cluster, and on our prod cluster. It seems > the frequency by which this happens in less on the test cluster (it has > lower traffic and lower resources) > 3. We use python's confluent_kafka 1.5.0 - based on librdkafka 1.5.0 > 4. Broker package version is confluent-kafka-2.11-2.0.1 > 5. I enabled TRACE log level for everything on the test cluster > > This is a trace log from the broker. Client logs indicate that the > timed-out operation started at 08:08:21: > > 2021-01-04 08:08:15,413] DEBUG TransactionalId mu-data-generator-7 > complete transition from PrepareCommit to > TxnTransitMetadata(producerId=10450009, producerEpoch=6501, > txnTimeoutMs=60000, txnState=CompleteCommit, topicPartitions=Set(), > txnStartTimestamp=1609747693580, txnLastUpdateTimestamp=1609747694751) > (kafka.coordinator.transaction.TransactionMetadata) > [2021-01-04 08:08:15,413] DEBUG [Transaction State Manager 2]: Updating > mu-data-generator-7's transaction state to > TxnTransitMetadata(producerId=10450009, producerEpoch=6501, > txnTimeoutMs=60000, txnState=CompleteCommit, topicPartitions=Set(), > txnStartTimestamp=1609747693580, txnLastUpdateTimestamp=1609747694751) with > coordinator epoch 375 for mu-data-generator-7 succeeded > (kafka.coordinator.transaction.TransactionStateManager) > [2021-01-04 08:08:15,413] TRACE [Transaction Marker Channel Manager 2]: > Completed transaction for mu-data-generator-7 with coordinator epoch 375, > final state after commit: CompleteCommit > (kafka.coordinator.transaction.TransactionMarkerChannelManager) > [2021-01-04 08:08:21,097] DEBUG TransactionalId mu-data-generator-7 > prepare transition from CompleteCommit to > TxnTransitMetadata(producerId=10450009, producerEpoch=6501, > txnTimeoutMs=60000, txnState=Ongoing, > topicPartitions=Set(__consumer_offsets-45), > txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097) > (kafka.coordinator.transaction.TransactionMetadata) > [2021-01-04 08:08:21,098] TRACE [Transaction State Manager 2]: Appending > new metadata TxnTransitMetadata(producerId=10450009, producerEpoch=6501, > txnTimeoutMs=60000, txnState=Ongoing, > topicPartitions=Set(__consumer_offsets-45), > txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097) for > transaction id mu-data-generator-7 with coordinator epoch 375 to the local > transaction log (kafka.coordinator.transaction.TransactionStateManager) > [2021-01-04 08:08:21,267] DEBUG TransactionalId mu-data-generator-7 > complete transition from CompleteCommit to > TxnTransitMetadata(producerId=10450009, producerEpoch=6501, > txnTimeoutMs=60000, txnState=Ongoing, > topicPartitions=Set(__consumer_offsets-45), > txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097) > (kafka.coordinator.transaction.TransactionMetadata) > [2021-01-04 08:08:21,267] DEBUG [Transaction State Manager 2]: Updating > mu-data-generator-7's transaction state to > TxnTransitMetadata(producerId=10450009, producerEpoch=6501, > txnTimeoutMs=60000, txnState=Ongoing, > topicPartitions=Set(__consumer_offsets-45), > txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097) with > coordinator epoch 375 for mu-data-generator-7 succeeded > (kafka.coordinator.transaction.TransactionStateManager) > [2021-01-04 08:09:26,828] DEBUG TransactionalId mu-data-generator-7 > prepare transition from Ongoing to TxnTransitMetadata(producerId=10450009, > producerEpoch=6502, txnTimeoutMs=60000, txnState=PrepareEpochFence, > topicPartitions=Set(__consumer_offsets-45), > txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747701097) > (kafka.coordinator.transaction.TransactionMetadata) > [2021-01-04 08:09:26,829] DEBUG TransactionalId mu-data-generator-7 > prepare transition from Ongoing to TxnTransitMetadata(producerId=10450009, > producerEpoch=6502, txnTimeoutMs=60000, txnState=PrepareAbort, > topicPartitions=Set(__consumer_offsets-45), > txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766829) > (kafka.coordinator.transaction.TransactionMetadata) > [2021-01-04 08:09:26,829] TRACE [Transaction State Manager 2]: Appending > new metadata TxnTransitMetadata(producerId=10450009, producerEpoch=6502, > txnTimeoutMs=60000, txnState=PrepareAbort, > topicPartitions=Set(__consumer_offsets-45), > txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766829) for > transaction id mu-data-generator-7 with coordinator epoch 375 to the local > transaction log (kafka.coordinator.transaction.TransactionStateManager) > [2021-01-04 08:09:26,978] DEBUG TransactionalId mu-data-generator-7 > complete transition from Ongoing to TxnTransitMetadata(producerId=10450009, > producerEpoch=6502, txnTimeoutMs=60000, txnState=PrepareAbort, > topicPartitions=Set(__consumer_offsets-45), > txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766829) > (kafka.coordinator.transaction.TransactionMetadata) > [2021-01-04 08:09:26,978] DEBUG [Transaction State Manager 2]: Updating > mu-data-generator-7's transaction state to > TxnTransitMetadata(producerId=10450009, producerEpoch=6502, > txnTimeoutMs=60000, txnState=PrepareAbort, > topicPartitions=Set(__consumer_offsets-45), > txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766829) with > coordinator epoch 375 for mu-data-generator-7 succeeded > (kafka.coordinator.transaction.TransactionStateManager) > [2021-01-04 08:09:26,978] DEBUG TransactionalId mu-data-generator-7 > prepare transition from PrepareAbort to > TxnTransitMetadata(producerId=10450009, producerEpoch=6502, > txnTimeoutMs=60000, txnState=CompleteAbort, topicPartitions=Set(), > txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766978) > (kafka.coordinator.transaction.TransactionMetadata) > [2021-01-04 08:09:26,979] INFO [TransactionCoordinator id=2] Completed > rollback ongoing transaction of transactionalId: mu-data-generator-7 due to > timeout (kafka.coordinator.transaction.TransactionCoordinator) > [2021-01-04 08:09:26,979] TRACE [Transaction Marker Channel Manager 2]: > Added marker TxnMarkerEntry{producerId=10450009, producerEpoch=6502, > coordinatorEpoch=375, result=ABORT, partitions=[__consumer_offsets-45]} for > transactional id mu-data-generator-7 to destination broker 2 > (kafka.coordinator.transaction.TransactionMarkerChannelManager) > [2021-01-04 08:09:27,229] TRACE [Transaction Marker Channel Manager 2]: > Completed sending transaction markers for mu-data-generator-7 as ABORT > (kafka.coordinator.transaction.TransactionMarkerChannelManager) > [2021-01-04 08:09:27,229] DEBUG [Transaction Marker Channel Manager 2]: > Sending mu-data-generator-7's transaction markers for > TransactionMetadata(transactionalId=mu-data-generator-7, > producerId=10450009, producerEpoch=6502, txnTimeoutMs=60000, > state=PrepareAbort, pendingState=Some(CompleteAbort), > topicPartitions=Set(), txnStartTimestamp=1609747701097, > txnLastUpdateTimestamp=1609747766829) with coordinator epoch 375 succeeded, > trying to append complete transaction log now > (kafka.coordinator.transaction.TransactionMarkerChannelManager) > [2021-01-04 08:09:27,230] TRACE [Transaction State Manager 2]: Appending > new metadata TxnTransitMetadata(producerId=10450009, producerEpoch=6502, > txnTimeoutMs=60000, txnState=CompleteAbort, topicPartitions=Set(), > txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766978) for > transaction id mu-data-generator-7 with coordinator epoch 375 to the local > transaction log (kafka.coordinator.transaction.TransactionStateManager) > [2021-01-04 08:09:27,453] DEBUG TransactionalId mu-data-generator-7 > complete transition from PrepareAbort to > TxnTransitMetadata(producerId=10450009, producerEpoch=6502, > txnTimeoutMs=60000, txnState=CompleteAbort, topicPartitions=Set(), > txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766978) > (kafka.coordinator.transaction.TransactionMetadata) > [2021-01-04 08:09:27,453] DEBUG [Transaction State Manager 2]: Updating > mu-data-generator-7's transaction state to > TxnTransitMetadata(producerId=10450009, producerEpoch=6502, > txnTimeoutMs=60000, txnState=CompleteAbort, topicPartitions=Set(), > txnStartTimestamp=1609747701097, txnLastUpdateTimestamp=1609747766978) with > coordinator epoch 375 for mu-data-generator-7 succeeded > (kafka.coordinator.transaction.TransactionStateManager) > [2021-01-04 08:09:27,453] TRACE [Transaction Marker Channel Manager 2]: > Completed transaction for mu-data-generator-7 with coordinator epoch 375, > final state after commit: CompleteAbort > (kafka.coordinator.transaction.TransactionMarkerChannelManager) > [2021-01-04 08:09:38,059] DEBUG TransactionalId mu-data-generator-7 > prepare transition from CompleteAbort to > TxnTransitMetadata(producerId=10450009, producerEpoch=6503, > txnTimeoutMs=60000, txnState=Empty, topicPartitions=Set(), > txnStartTimestamp=-1, txnLastUpdateTimestamp=1609747778059) > (kafka.coordinator.transaction.TransactionMetadata) > [2021-01-04 08:09:38,060] TRACE [Transaction State Manager 2]: Appending > new metadata TxnTransitMetadata(producerId=10450009, producerEpoch=6503, > txnTimeoutMs=60000, txnState=Empty, topicPartitions=Set(), > txnStartTimestamp=-1, txnLastUpdateTimestamp=1609747778059) for transaction > id mu-data-generator-7 with coordinator epoch 375 to the local transaction > log (kafka.coordinator.transaction.TransactionStateManager) > [2021-01-04 08:09:38,852] DEBUG TransactionalId mu-data-generator-7 > complete transition from CompleteAbort to > TxnTransitMetadata(producerId=10450009, producerEpoch=6503, > txnTimeoutMs=60000, txnState=Empty, topicPartitions=Set(), > txnStartTimestamp=-1, txnLastUpdateTimestamp=1609747778059) > (kafka.coordinator.transaction.TransactionMetadata) > [2021-01-04 08:09:38,852] DEBUG [Transaction State Manager 2]: Updating > mu-data-generator-7's transaction state to > TxnTransitMetadata(producerId=10450009, producerEpoch=6503, > txnTimeoutMs=60000, txnState=Empty, topicPartitions=Set(), > txnStartTimestamp=-1, txnLastUpdateTimestamp=1609747778059) with > coordinator epoch 375 for mu-data-generator-7 succeeded > (kafka.coordinator.transaction.TransactionStateManager) > [2021-01-04 08:09:38,852] INFO [TransactionCoordinator id=2] Initialized > transactionalId mu-data-generator-7 with producerId 10450009 and producer > epoch 6503 on partition __transaction_state-29 > (kafka.coordinator.transaction.TransactionCoo > > Also this might be interesting: > > [2021-01-04 08:08:21,097] TRACE [Kafka Request Handler 9 on Broker 2], > Kafka request handler 9 on broker 2 handling request Request(processor=5, > connectionId=10.48.111.102:9092-10.48.110.74:45394-99, > session=Session(User:ANONYMOUS,/10.48.1 > [2021-01-04 08:08:21,097] DEBUG TransactionalId mu-data-generator-7 > prepare transition from CompleteCommit to > TxnTransitMetadata(producerId=10450009, producerEpoch=6501, > txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(__consumer_ > [2021-01-04 08:08:21,097] TRACE [ReplicaManager broker=2] Append > [Map(__transaction_state-29 -> [(record=DefaultRecord(offset=0, > timestamp=1609747701097, key=23 bytes, value=65 bytes))])] to local log > (kafka.server.ReplicaManager) > [2021-01-04 08:08:21,097] TRACE Inserting 158 bytes at end offset 14721706 > at position 23266940 with largest timestamp 1609747701097 at shallow offset > 14721706 (kafka.log.LogSegment) > [2021-01-04 08:08:21,097] TRACE Appended 158 to > /mnt/data/kafka/__transaction_state-29/00000000000014565986.log at end > offset 14721706 (kafka.log.LogSegment) > [2021-01-04 08:08:21,097] TRACE [Log partition=__transaction_state-29, > dir=/mnt/data/kafka] Appended message set with last offset: 14721706, first > offset: Some(14721706), next offset: 14721707, and messages: > [(record=DefaultRecord(offset=1 > [2021-01-04 08:08:21,097] DEBUG [ReplicaManager broker=2] Request key > __transaction_state-29 unblocked 0 fetch requests. > (kafka.server.ReplicaManager) > [2021-01-04 08:08:21,098] DEBUG [Partition __transaction_state-29 > broker=2] Skipping update high watermark since new hw (offset=14721706 > segment=[14565986:23266940]) is not larger than old hw (offset=14721706 > segment=[14565986:23266940]). > [2021-01-04 08:08:21,098] TRACE [ReplicaManager broker=2] 158 written to > log __transaction_state-29 beginning at offset 14721706 and ending at > offset 14721706 (kafka.server.ReplicaManager) > [2021-01-04 08:08:21,098] DEBUG [ReplicaManager broker=2] Produce to local > log in 1 ms (kafka.server.ReplicaManager) > [2021-01-04 08:08:21,098] TRACE Initial partition status for > __transaction_state-29 is [acksPending: true, error: 7, startOffset: > 14721706, requiredOffset: 14721707] (kafka.server.DelayedProduce) > [2021-01-04 08:08:21,098] TRACE Checking produce satisfaction for > __transaction_state-29, current status [acksPending: true, error: 7, > startOffset: 14721706, requiredOffset: 14721707] > (kafka.server.DelayedProduce) > [2021-01-04 08:08:21,098] TRACE [Partition __transaction_state-29 > broker=2] Progress awaiting ISR acks for offset 14721707: acked: Set(broker > 2: 14721707), awaiting Set(broker 3: 14721706) (kafka.cluster.Partition) > [2021-01-04 08:08:21,098] TRACE Checking produce satisfaction for > __transaction_state-29, current status [acksPending: true, error: 7, > startOffset: 14721706, requiredOffset: 14721707] > (kafka.server.DelayedProduce) > [2021-01-04 08:08:21,098] TRACE [Partition __transaction_state-29 > broker=2] Progress awaiting ISR acks for offset 14721707: acked: Set(broker > 2: 14721707), awaiting Set(broker 3: 14721706) (kafka.cluster.Partition) > [2021-01-04 08:08:21,098] TRACE [Transaction State Manager 2]: Appending > new metadata TxnTransitMetadata(producerId=10450009, producerEpoch=6501, > txnTimeoutMs=60000, txnState=Ongoing, > topicPartitions=Set(__consumer_offsets-45), txnStartTim > > > > Apart from upgrading the kafka version or upgrading to newer librdkafka > when new confluent_kafka for python will come out, I am out of ideas of > what to try. > > I would be grateful for any suggestion about where to look to debug this > kind of issue. >