[ 
https://issues.apache.org/jira/browse/KAFKA-8334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao reassigned KAFKA-8334:
------------------------------

    Fix Version/s: 2.7.0
         Assignee: Chia-Ping Tsai
       Resolution: Fixed

Merged the PR to trunk.

> Occasional OffsetCommit Timeout
> -------------------------------
>
>                 Key: KAFKA-8334
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8334
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.1.1
>            Reporter: windkithk
>            Assignee: Chia-Ping Tsai
>            Priority: Major
>             Fix For: 2.7.0
>
>         Attachments: kafka8334.patch, offsetcommit_p99.9_cut.jpg
>
>
> h2. 1) Issue Summary
> Since we have upgraded to 1.1, we have observed occasional OffsetCommit 
> timeouts from clients
> {code:java}
> Offset commit failed on partition sometopic-somepartition at offset 
> someoffset: The request timed out{code}
> Normally OffsetCommit completes within 10ms but when we check the 99.9 
> percentile, we see the request duration time jumps up to 5000 ms 
> (offsets.commit.timeout.ms)
> Here is a screenshot of prometheus recording 
> kafka_network_request_duration_milliseconds
> (offsetcommit_p99.9_cut.jpg)
> and after checking the duration breakdown, most of the time was spent on 
> "Remote" Scope
> (Below is a request log line produced by inhouse slow request logger
> {code:java}
> [2019-04-16 13:06:20,339] WARN Slow 
> response:RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2, 
> clientId=kafka-python-1.4.6, correlationId=953) -- 
> {group_id=wilson-tester,generation_id=28,member_id=kafka-python-1.4.6-69ed979d-a069-4c6d-9862-e4fc34883269,retention_time=-1,topics=[{topic=test,partitions=[{partition=2,offset=63456,metadata=null}]}]}
>  from 
> connection;totalTime:5001.942000,requestQueueTime:0.030000,localTime:0.574000,remoteTime:5001.173000,responseQueueTime:0.058000,sendTime:0.053000,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS
>  (kafka.request.logger)
> {code}
> h2. 2) What got changed in 1.1 from 0.10.2.1?
> # Log Level Changed
> In 1.1 Kafka Consumer, logging about timed out OffsetCommit is changed from 
> DEBUG to WARN
> # Group Lock is acquired when trying to complete DelayedProduce of 
> OffsetCommit
> This was added after 0.11.0.2
> (Ticket: https://issues.apache.org/jira/browse/KAFKA-6042)
> (PR: https://github.com/apache/kafka/pull/4103)
> (in 1.1 
> https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L292)
> # Followers do incremental fetch
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability
> h2. 3) Interaction between OffsetCommit, DelayedProduce and FetchFollower
> {quote}
> OffsetCommit append a message of committed offset to partition of topic 
> `__consumer_offsets`
> During the append, it would create a DelayedProduce with lock to 
> GroupMetadata.lock (ReentrantLock) and add to delayedProducePurgatory
> When follower fetches the partition of topic `__consumer_offsets` and causes 
> an increase in HighWaterMark, delayedProducePurgatory would be transversed 
> and all operations related to the partition may be completed
> {quote}
> *DelayedProduce from OffsetCommit may not be completed, if the group metadata 
> lock was held by others*
> h2. 4) Reproduce
> h4. Methodology
> {code}
> 1. DelayedProduce on __consumer_offsets could not be completed if the 
> group.lock is acquired by others
> 2. We spam requests like Heartbeat to keep acquiring group.lock
> 3. We keep sending OffsetCommit and check the processing time
> {code}
> h4. Reproduce Script
> https://gist.github.com/windkit/3384bb86dc146111d1e0857e66b85861
> # jammer.py - join the group "wilson-tester" and keep spamming Heartbeat
> # tester.py - fetch one message and do a long processing (or sleep) and then 
> commit the offset
> h4. Result
> ||Seq||Operation||Lock||
> |1|OffsetCommit Request       
> |2|Append to local __consumer_offsets 
> |3|DelayedProduce tryComplete 
> |4|Added into delayedProducePurgatory 
> |5|FetchFollower1 Fetch
> |6|FetchFollower2 Fetch       
> |7|Heartbeat Request|Acquired group.lock
> |8|FetchFollower2 maybeTryComplete DelayedProduce|Failed to acquire group.lock
> |9|Heartbeat Response|Released group.lock
> | |(NO FetchFollower Requests on the partitions __consumer_offsets)
> |10|OffsetCommit Response (Timeout)
> h4. Trace Log
> {code}
> // The OffsetCommit Request
> [2019-04-15 23:59:53,736] TRACE [KafkaApi-1] Handling 
> request:RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2, 
> clientId=kafka-python-1.4.6, correlationId=2114) -- 
> {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-60008b58-4d6a-4cfd-948f-dd9e19e7f981,retention_time=-1,topics=[{topic=test,partitions=[{partition=2,offset=22654,metadata=null}]}]}
>  from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS 
> (kafka.server.KafkaApis)
>  
> // Initial Check of DelayedProduce:tryCompleteElseWatch
> // 
> https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/server/DelayedOperation.scala#L217
> [2019-04-15 23:59:53,736] TRACE Initial partition status for 
> __consumer_offsets-48 is [acksPending: true, error: 7, startOffset: 23134, 
> requiredOffset: 23135] (kafka.server.DelayedProduce)
> [2019-04-15 23:59:53,736] TRACE Checking produce satisfaction for 
> __consumer_offsets-48, current status [acksPending: true, error: 7, 
> startOffset: 23134, requiredOffset: 23135] (kafka.server.DelayedProduce)
> [2019-04-15 23:59:53,736] TRACE Checking produce satisfaction for 
> __consumer_offsets-48, current status [acksPending: true, error: 7, 
> startOffset: 23134, requiredOffset: 23135] (kafka.server.DelayedProduce)
>  
> // Follower fetching the new message in __consumer_offsets-48, and with 
> Heartbeat in between
> // DelayedOperation:maybeTryComplete leaves the DelayedProduce unchecked when 
> it cannot obtain the lock (group.lock)
> // 
> https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/server/DelayedOperation.scala#L371
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Sending heartbeat response 
> org.apache.kafka.common.requests.HeartbeatResponse@4388ce00 for correlation 
> id 1492702 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Handling 
> request:RequestHeader(apiKey=HEARTBEAT, apiVersion=1, 
> clientId=kafka-python-1.4.6, correlationId=1492703) -- 
> {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-4910b677-ecf2-4d1e-9fd0-a705aa37e0a6}
>  from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS 
> (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Sending heartbeat response 
> org.apache.kafka.common.requests.HeartbeatResponse@96e1b92 for correlation id 
> 1492703 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Handling 
> request:RequestHeader(apiKey=FETCH, apiVersion=7, 
> clientId=broker-5-fetcher-0, correlationId=1788883) -- 
> {replica_id=5,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=357018451,epoch=1788882,topics=[{topic=__consumer_offsets,partitions=[{partition=48,fetch_offset=23135,log_start_offset=0,max_bytes=1048576}]}],forgetten_topics_data=[]}
>  from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS 
> (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Handling 
> request:RequestHeader(apiKey=HEARTBEAT, apiVersion=1, 
> clientId=kafka-python-1.4.6, correlationId=1492704) -- 
> {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-4910b677-ecf2-4d1e-9fd0-a705aa37e0a6}
>  from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS 
> (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Sending heartbeat response 
> org.apache.kafka.common.requests.HeartbeatResponse@6cb7676f for correlation 
> id 1492704 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,738] TRACE [KafkaApi-1] Handling 
> request:RequestHeader(apiKey=HEARTBEAT, apiVersion=1, 
> clientId=kafka-python-1.4.6, correlationId=1492705) -- 
> {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-4910b677-ecf2-4d1e-9fd0-a705aa37e0a6}
>  from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS 
> (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,738] TRACE [KafkaApi-1] Handling 
> request:RequestHeader(apiKey=FETCH, apiVersion=7, 
> clientId=broker-4-fetcher-0, correlationId=1788529) -- 
> {replica_id=4,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=1188767819,epoch=1788528,topics=[{topic=__consumer_offsets,partitions=[{partition=48,fetch_offset=23135,log_start_offset=0,max_bytes=1048576}]}],forgetten_topics_data=[]}
>  from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS 
> (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,738] TRACE [KafkaApi-1] Sending heartbeat response 
> org.apache.kafka.common.requests.HeartbeatResponse@5552d3ab for correlation 
> id 1492705 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
>  
> // OffsetCommit Timed out
> [2019-04-15 23:59:58,737] DEBUG [KafkaApi-1] Offset commit request with 
> correlation id 2114 from client kafka-python-1.4.6 on partition test-2 failed 
> due to org.apache.kafka.common.errors.TimeoutException 
> (kafka.server.KafkaApis)
> {code}
> h4. OffsetCommit, FetcherFollower, HeartBeat
> When a FetchFollower comes and updates the HighWaterMark, it supposes to 
> complete the DelayedProduce and so the OffsetCommit request.
> However, it would only do it when it can obtain the GroupMetaDataLock (retry 
> only once and immediately...)
> As a result, the DelayedProduce would only be checked next time when 
> FetchFollower comes (and updates the HighWaterMark).
> Which usually means the next OffsetCommit, this explains why we observe 
> OffsetCommit request timed out (high OffsetCommit latency) during low traffic 
> time.
> ||OffsetCommit||FetcherFollower||HeartBeat||
> | replicaManager.appendRecords | |
> | → delayedProducePurgatory.tryCompleteElseWatch | |
> | → tryComplete() | |
> | → watchForOperation() | |
> | → operation.maybeTryComplete() | |
> | | partition.updateReplicaLogReadResult |
> | | → tryCompleteDelayedRequests |
> | | → delayedProducePurgatory.checkAndComplete |
> | | → watchers.tryCompleteWatched |
> | | → operation.maybeTryComplete() | group.lock
> | | → group.tryLock |
> | | → false |
> | | | group.unlock
> h2. 5) Solution
> We can have a separate executor to later retry completing DelayedOperation 
> which failed to obtain lock



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to