[ 
https://issues.apache.org/jira/browse/KAFKA-13559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-13559.
------------------------------------
    Fix Version/s: 3.4.0
         Reviewer: Rajini Sivaram
       Resolution: Fixed

> The broker's  ProduceResponse may be delayed for 300ms
> ------------------------------------------------------
>
>                 Key: KAFKA-13559
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13559
>             Project: Kafka
>          Issue Type: Task
>          Components: core
>    Affects Versions: 2.7.0
>            Reporter: frankshi
>            Assignee: Badai Aqrandista
>            Priority: Major
>             Fix For: 3.4.0
>
>         Attachments: image-1.png, image-2.png, 
> image-2021-12-21-14-44-56-689.png, image-2021-12-21-14-45-22-716.png, 
> image-3.png, image-5.png, image-6.png, image-7.png, image.png
>
>
> Hi team:
> We have found the value in the source code 
> [here|https://github.com/apache/kafka/blob/2.7/core/src/main/scala/kafka/network/SocketServer.scala#L922]
>  may lead broker’s  ProduceResponse to be delayed for 300ms.
>  * Server-version: 2.13-2.7.0.
>  * Client-version: confluent-kafka-python-1.5.0.
> we have set the client’s  configuration as following:
> {code:java}
> ling.ms = 0
> acks = 1
> delivery.timeout.ms = 100
> request.timeout.ms =  80
> Sasl.mechanism =  “PLAIN”
> Security.protocol  =  “SASL_SSL”
> ......
> {code}
> Because we set ACKs = 1, the client sends ProduceRequests and receives 
> ProduceResponses from brokers. The leader broker doesn't need to wait for the 
> ISR’s writing data to disk successfully.  It can reply to the client by 
> sending ProduceResponses directly. In our situation, the ping value between 
> the client and the kafka brokers is about ~10ms, and most of the time, the 
> responses are received about 10ms after the Produce requests are sent. But 
> sometimes the responses are received about ~300ms later.
> The following shows the log from the client.
> {code:java}
> 2021-11-26 02:31:30,567  Sent partial ProduceRequest (v7, 0+16527/37366 
> bytes, CorrId 2753)
> 2021-11-26 02:31:30,568  Sent partial ProduceRequest (v7, 16527+16384/37366 
> bytes, CorrId 2753)
> 2021-11-26 02:31:30,568  Sent ProduceRequest (v7, 37366 bytes @ 32911, CorrId 
> 2753)
> 2021-11-26 02:31:30,570  Sent ProduceRequest (v7, 4714 bytes @ 0, CorrId 2754)
> 2021-11-26 02:31:30,571  Sent ProduceRequest (v7, 1161 bytes @ 0, CorrId 2755)
> 2021-11-26 02:31:30,572  Sent ProduceRequest (v7, 1240 bytes @ 0, CorrId 2756)
> 2021-11-26 02:31:30,572  Received ProduceResponse (v7, 69 bytes, CorrId 2751, 
> rtt 9.79ms)
> 2021-11-26 02:31:30,572  Received ProduceResponse (v7, 69 bytes, CorrId 2752, 
> rtt 10.34ms)
> 2021-11-26 02:31:30,573  Received ProduceResponse (v7, 69 bytes, CorrId 2753, 
> rtt 10.11ms)
> 2021-11-26 02:31:30,872  Received ProduceResponse (v7, 69 bytes, CorrId 2754, 
> rtt 309.69ms)
> 2021-11-26 02:31:30,883  Sent ProduceRequest (v7, 1818 bytes @ 0, CorrId 2757)
> 2021-11-26 02:31:30,887  Sent ProduceRequest (v7, 1655 bytes @ 0, CorrId 2758)
> 2021-11-26 02:31:30,888  Received ProduceResponse (v7, 69 bytes, CorrId 2755, 
> rtt 318.85ms)
> 2021-11-26 02:31:30,893  Sent partial ProduceRequest (v7, 0+16527/37562 
> bytes, CorrId 2759)
> 2021-11-26 02:31:30,894  Sent partial ProduceRequest (v7, 16527+16384/37562 
> bytes, CorrId 2759)
> 2021-11-26 02:31:30,895  Sent ProduceRequest (v7, 37562 bytes @ 32911, CorrId 
> 2759)
> 2021-11-26 02:31:30,896  Sent ProduceRequest (v7, 4700 bytes @ 0, CorrId 2760)
> 2021-11-26 02:31:30,897  Received ProduceResponse (v7, 69 bytes, CorrId 2756, 
> rtt 317.74ms)
> 2021-11-26 02:31:30,897  Received ProduceResponse (v7, 69 bytes, CorrId 2757, 
> rtt 4.22ms)
> 2021-11-26 02:31:30,899  Received ProduceResponse (v7, 69 bytes, CorrId 2758, 
> rtt 2.61ms){code}
>  
> The requests of CorrId 2753 and 2754 are almost sent at the same time, but 
> the Response of 2754 is delayed for ~300ms. 
> We checked the logs on the broker.
>  
> {code:java}
> [2021-11-26 02:31:30,873] DEBUG Completed 
> request:RequestHeader(apiKey=PRODUCE, apiVersion=7, clientId=rdkafka, 
> correlationId=2754) – {acks=1,timeout=80,numPartitions=1},response: 
> {responses=[\{topic=***,partition_responses=[{partition=32,error_code=0,base_offset=58625,log_append_time=-1,log_start_offset=49773}]}
> ],throttle_time_ms=0} from connection 
> 10.10.44.59:9093-10.10.0.68:31183-66;totalTime:0.852,requestQueueTime:0.128,localTime:0.427,remoteTime:0.09,throttleTime:0,responseQueueTime:0.073,sendTime:0.131,securityProtocol:SASL_SSL,principal:User:***,listener:SASL_SSL,clientInformation:ClientInformation(softwareName=confluent-kafka-python,
>  softwareVersion=1.5.0-rdkafka-1.5.2) (kafka.request.logger)
> {code}
>  
>  
> It seems that the time cost on the server side is very small. What’s the 
> reason for the latency spikes?
> We also did tcpdump  at the server side and found the delay comes from the 
> server side.
> The CorrId=2754’s request was received at 10:31:30.566172 and The 
> CorrId=2754’s response was sent at 10:31:30.873518. So, the server's  
> processing time for this request is about {*}873-566=307ms{*}.
> wireshark shows the CorrId=2754 ProduceRequest's timestamp and request info.
> !image-2021-12-21-14-45-22-716.png!
> wireshark shows the CorrId=2754 ProduceResponse's timestamp and response info.
> !image-2021-12-21-14-44-56-689.png!
>  
> We checked the source code and found the problems. The broker’s processor’s 
> run loop is as following:
> !image-5.png|width=1001,height=449!
> Look at the poll  function, you can see the {*}poll timeout value is 300ms{*}.
> {code:java}
> private def poll(): Unit = {
>     val pollTimeout = if (newConnections.isEmpty) 300 else 0
>     try selector.poll(pollTimeout)
>     catch {
>       case e @ (_: IllegalStateException | _: IOException) =>
>         // The exception is not re-thrown and any completed 
> sends/receives/connections/disconnections
>         // from this poll will be processed.
>         error(s"Processor $id poll failed", e)
>     }
>   }{code}
>  
> The following is the selector.poll function:
> !image-6.png!
> So,  we may encounter the following situation:
>  * The first run in the loop.
>  ** poll -> received request ->processCompletedReceives -> request to queue.
>  * The second run in the loop.
>  ** processNewResponse-> ResponseQueue is empty(IO thread is processing the 
> request) -> poll() ->  select(timeout=0) -> 
> {color:#172b4d}madeReadProgressLastCall = false{color}
>  * The third run in the loop.
>  ** processNewResponse ->   ResponseQueue is *NOT* empty -> poll() -> 
> select(timeout=300) immediately return, because the response data is already, 
> the fd has been added to write_fd sets.
>  * The fourth run in the loop.
>  ** ResponseQueue is empty() -> poll() -> select(timeout=300) wait for 300ms 
> or new data arrives.
> The server may receive server produce requests at one time  but can only 
> handle one request each time, after the previous response sending finished, 
> then it can handle the next request.  When the previous request is in 
> handling status, the other requests are saved in the cache. So, if the first 
> response was sent and at that time no new data arrived, the saved request may 
> be delayed for 300ms to process. 
> {color:#ff0000}*We suggest changing the poll timeout value from 300  to 
> 10.*{color}
> *The following two figures show the comparisons of Request-Response RTT 
> value.*
> *!image-3.png!*
>  RTT values when poll timeout value = 300
>  
> !image-2.png!
> RTT values when poll timeout value = 10
>  
> Another problem, why does the server's  log show the total time is very small?
> Because the start  time is set in the following function  
> processCompletedReceives, yet when the request is saved at the cache, the 
> timer doesn't start, so the totalTime doesn't include the time in the cache.
> !image-7.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to