[ https://issues.apache.org/jira/browse/KAFKA-8334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17026030#comment-17026030 ]
zhongyushuo commented on KAFKA-8334: ------------------------------------ Hi @windkithk, we also hit this issue. with patch installed, this error still exists. do u have any other suggestion? thank u for help. > Occasional OffsetCommit Timeout > ------------------------------- > > Key: KAFKA-8334 > URL: https://issues.apache.org/jira/browse/KAFKA-8334 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 1.1.1 > Reporter: windkithk > Priority: Major > Attachments: kafka8334.patch, offsetcommit_p99.9_cut.jpg > > > h2. 1) Issue Summary > Since we have upgraded to 1.1, we have observed occasional OffsetCommit > timeouts from clients > {code:java} > Offset commit failed on partition sometopic-somepartition at offset > someoffset: The request timed out{code} > Normally OffsetCommit completes within 10ms but when we check the 99.9 > percentile, we see the request duration time jumps up to 5000 ms > (offsets.commit.timeout.ms) > Here is a screenshot of prometheus recording > kafka_network_request_duration_milliseconds > (offsetcommit_p99.9_cut.jpg) > and after checking the duration breakdown, most of the time was spent on > "Remote" Scope > (Below is a request log line produced by inhouse slow request logger > {code:java} > [2019-04-16 13:06:20,339] WARN Slow > response:RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2, > clientId=kafka-python-1.4.6, correlationId=953) -- > {group_id=wilson-tester,generation_id=28,member_id=kafka-python-1.4.6-69ed979d-a069-4c6d-9862-e4fc34883269,retention_time=-1,topics=[{topic=test,partitions=[{partition=2,offset=63456,metadata=null}]}]} > from > connection;totalTime:5001.942000,requestQueueTime:0.030000,localTime:0.574000,remoteTime:5001.173000,responseQueueTime:0.058000,sendTime:0.053000,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS > (kafka.request.logger) > {code} > h2. 2) What got changed in 1.1 from 0.10.2.1? > # Log Level Changed > In 1.1 Kafka Consumer, logging about timed out OffsetCommit is changed from > DEBUG to WARN > # Group Lock is acquired when trying to complete DelayedProduce of > OffsetCommit > This was added after 0.11.0.2 > (Ticket: https://issues.apache.org/jira/browse/KAFKA-6042) > (PR: https://github.com/apache/kafka/pull/4103) > (in 1.1 > https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L292) > # Followers do incremental fetch > https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability > h2. 3) Interaction between OffsetCommit, DelayedProduce and FetchFollower > {quote} > OffsetCommit append a message of committed offset to partition of topic > `__consumer_offsets` > During the append, it would create a DelayedProduce with lock to > GroupMetadata.lock (ReentrantLock) and add to delayedProducePurgatory > When follower fetches the partition of topic `__consumer_offsets` and causes > an increase in HighWaterMark, delayedProducePurgatory would be transversed > and all operations related to the partition may be completed > {quote} > *DelayedProduce from OffsetCommit may not be completed, if the group metadata > lock was held by others* > h2. 4) Reproduce > h4. Methodology > {code} > 1. DelayedProduce on __consumer_offsets could not be completed if the > group.lock is acquired by others > 2. We spam requests like Heartbeat to keep acquiring group.lock > 3. We keep sending OffsetCommit and check the processing time > {code} > h4. Reproduce Script > https://gist.github.com/windkit/3384bb86dc146111d1e0857e66b85861 > # jammer.py - join the group "wilson-tester" and keep spamming Heartbeat > # tester.py - fetch one message and do a long processing (or sleep) and then > commit the offset > h4. Result > ||Seq||Operation||Lock|| > |1|OffsetCommit Request > |2|Append to local __consumer_offsets > |3|DelayedProduce tryComplete > |4|Added into delayedProducePurgatory > |5|FetchFollower1 Fetch > |6|FetchFollower2 Fetch > |7|Heartbeat Request|Acquired group.lock > |8|FetchFollower2 maybeTryComplete DelayedProduce|Failed to acquire group.lock > |9|Heartbeat Response|Released group.lock > | |(NO FetchFollower Requests on the partitions __consumer_offsets) > |10|OffsetCommit Response (Timeout) > h4. Trace Log > {code} > // The OffsetCommit Request > [2019-04-15 23:59:53,736] TRACE [KafkaApi-1] Handling > request:RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2, > clientId=kafka-python-1.4.6, correlationId=2114) -- > {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-60008b58-4d6a-4cfd-948f-dd9e19e7f981,retention_time=-1,topics=[{topic=test,partitions=[{partition=2,offset=22654,metadata=null}]}]} > from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS > (kafka.server.KafkaApis) > > // Initial Check of DelayedProduce:tryCompleteElseWatch > // > https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/server/DelayedOperation.scala#L217 > [2019-04-15 23:59:53,736] TRACE Initial partition status for > __consumer_offsets-48 is [acksPending: true, error: 7, startOffset: 23134, > requiredOffset: 23135] (kafka.server.DelayedProduce) > [2019-04-15 23:59:53,736] TRACE Checking produce satisfaction for > __consumer_offsets-48, current status [acksPending: true, error: 7, > startOffset: 23134, requiredOffset: 23135] (kafka.server.DelayedProduce) > [2019-04-15 23:59:53,736] TRACE Checking produce satisfaction for > __consumer_offsets-48, current status [acksPending: true, error: 7, > startOffset: 23134, requiredOffset: 23135] (kafka.server.DelayedProduce) > > // Follower fetching the new message in __consumer_offsets-48, and with > Heartbeat in between > // DelayedOperation:maybeTryComplete leaves the DelayedProduce unchecked when > it cannot obtain the lock (group.lock) > // > https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/server/DelayedOperation.scala#L371 > [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Sending heartbeat response > org.apache.kafka.common.requests.HeartbeatResponse@4388ce00 for correlation > id 1492702 to client kafka-python-1.4.6. (kafka.server.KafkaApis) > [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Handling > request:RequestHeader(apiKey=HEARTBEAT, apiVersion=1, > clientId=kafka-python-1.4.6, correlationId=1492703) -- > {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-4910b677-ecf2-4d1e-9fd0-a705aa37e0a6} > from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS > (kafka.server.KafkaApis) > [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Sending heartbeat response > org.apache.kafka.common.requests.HeartbeatResponse@96e1b92 for correlation id > 1492703 to client kafka-python-1.4.6. (kafka.server.KafkaApis) > [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Handling > request:RequestHeader(apiKey=FETCH, apiVersion=7, > clientId=broker-5-fetcher-0, correlationId=1788883) -- > {replica_id=5,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=357018451,epoch=1788882,topics=[{topic=__consumer_offsets,partitions=[{partition=48,fetch_offset=23135,log_start_offset=0,max_bytes=1048576}]}],forgetten_topics_data=[]} > from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS > (kafka.server.KafkaApis) > [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Handling > request:RequestHeader(apiKey=HEARTBEAT, apiVersion=1, > clientId=kafka-python-1.4.6, correlationId=1492704) -- > {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-4910b677-ecf2-4d1e-9fd0-a705aa37e0a6} > from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS > (kafka.server.KafkaApis) > [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Sending heartbeat response > org.apache.kafka.common.requests.HeartbeatResponse@6cb7676f for correlation > id 1492704 to client kafka-python-1.4.6. (kafka.server.KafkaApis) > [2019-04-15 23:59:53,738] TRACE [KafkaApi-1] Handling > request:RequestHeader(apiKey=HEARTBEAT, apiVersion=1, > clientId=kafka-python-1.4.6, correlationId=1492705) -- > {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-4910b677-ecf2-4d1e-9fd0-a705aa37e0a6} > from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS > (kafka.server.KafkaApis) > [2019-04-15 23:59:53,738] TRACE [KafkaApi-1] Handling > request:RequestHeader(apiKey=FETCH, apiVersion=7, > clientId=broker-4-fetcher-0, correlationId=1788529) -- > {replica_id=4,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=1188767819,epoch=1788528,topics=[{topic=__consumer_offsets,partitions=[{partition=48,fetch_offset=23135,log_start_offset=0,max_bytes=1048576}]}],forgetten_topics_data=[]} > from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS > (kafka.server.KafkaApis) > [2019-04-15 23:59:53,738] TRACE [KafkaApi-1] Sending heartbeat response > org.apache.kafka.common.requests.HeartbeatResponse@5552d3ab for correlation > id 1492705 to client kafka-python-1.4.6. (kafka.server.KafkaApis) > > // OffsetCommit Timed out > [2019-04-15 23:59:58,737] DEBUG [KafkaApi-1] Offset commit request with > correlation id 2114 from client kafka-python-1.4.6 on partition test-2 failed > due to org.apache.kafka.common.errors.TimeoutException > (kafka.server.KafkaApis) > {code} > h4. OffsetCommit, FetcherFollower, HeartBeat > When a FetchFollower comes and updates the HighWaterMark, it supposes to > complete the DelayedProduce and so the OffsetCommit request. > However, it would only do it when it can obtain the GroupMetaDataLock (retry > only once and immediately...) > As a result, the DelayedProduce would only be checked next time when > FetchFollower comes (and updates the HighWaterMark). > Which usually means the next OffsetCommit, this explains why we observe > OffsetCommit request timed out (high OffsetCommit latency) during low traffic > time. > ||OffsetCommit||FetcherFollower||HeartBeat|| > | replicaManager.appendRecords | | > | → delayedProducePurgatory.tryCompleteElseWatch | | > | → tryComplete() | | > | → watchForOperation() | | > | → operation.maybeTryComplete() | | > | | partition.updateReplicaLogReadResult | > | | → tryCompleteDelayedRequests | > | | → delayedProducePurgatory.checkAndComplete | > | | → watchers.tryCompleteWatched | > | | → operation.maybeTryComplete() | group.lock > | | → group.tryLock | > | | → false | > | | | group.unlock > h2. 5) Solution > We can have a separate executor to later retry completing DelayedOperation > which failed to obtain lock -- This message was sent by Atlassian Jira (v8.3.4#803005)