[ https://issues.apache.org/jira/browse/KAFKA-13559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rajini Sivaram resolved KAFKA-13559. ------------------------------------ Fix Version/s: 3.4.0 Reviewer: Rajini Sivaram Resolution: Fixed > The broker's ProduceResponse may be delayed for 300ms > ------------------------------------------------------ > > Key: KAFKA-13559 > URL: https://issues.apache.org/jira/browse/KAFKA-13559 > Project: Kafka > Issue Type: Task > Components: core > Affects Versions: 2.7.0 > Reporter: frankshi > Assignee: Badai Aqrandista > Priority: Major > Fix For: 3.4.0 > > Attachments: image-1.png, image-2.png, > image-2021-12-21-14-44-56-689.png, image-2021-12-21-14-45-22-716.png, > image-3.png, image-5.png, image-6.png, image-7.png, image.png > > > Hi team: > We have found the value in the source code > [here|https://github.com/apache/kafka/blob/2.7/core/src/main/scala/kafka/network/SocketServer.scala#L922] > may lead broker’s ProduceResponse to be delayed for 300ms. > * Server-version: 2.13-2.7.0. > * Client-version: confluent-kafka-python-1.5.0. > we have set the client’s configuration as following: > {code:java} > ling.ms = 0 > acks = 1 > delivery.timeout.ms = 100 > request.timeout.ms = 80 > Sasl.mechanism = “PLAIN” > Security.protocol = “SASL_SSL” > ...... > {code} > Because we set ACKs = 1, the client sends ProduceRequests and receives > ProduceResponses from brokers. The leader broker doesn't need to wait for the > ISR’s writing data to disk successfully. It can reply to the client by > sending ProduceResponses directly. In our situation, the ping value between > the client and the kafka brokers is about ~10ms, and most of the time, the > responses are received about 10ms after the Produce requests are sent. But > sometimes the responses are received about ~300ms later. > The following shows the log from the client. > {code:java} > 2021-11-26 02:31:30,567 Sent partial ProduceRequest (v7, 0+16527/37366 > bytes, CorrId 2753) > 2021-11-26 02:31:30,568 Sent partial ProduceRequest (v7, 16527+16384/37366 > bytes, CorrId 2753) > 2021-11-26 02:31:30,568 Sent ProduceRequest (v7, 37366 bytes @ 32911, CorrId > 2753) > 2021-11-26 02:31:30,570 Sent ProduceRequest (v7, 4714 bytes @ 0, CorrId 2754) > 2021-11-26 02:31:30,571 Sent ProduceRequest (v7, 1161 bytes @ 0, CorrId 2755) > 2021-11-26 02:31:30,572 Sent ProduceRequest (v7, 1240 bytes @ 0, CorrId 2756) > 2021-11-26 02:31:30,572 Received ProduceResponse (v7, 69 bytes, CorrId 2751, > rtt 9.79ms) > 2021-11-26 02:31:30,572 Received ProduceResponse (v7, 69 bytes, CorrId 2752, > rtt 10.34ms) > 2021-11-26 02:31:30,573 Received ProduceResponse (v7, 69 bytes, CorrId 2753, > rtt 10.11ms) > 2021-11-26 02:31:30,872 Received ProduceResponse (v7, 69 bytes, CorrId 2754, > rtt 309.69ms) > 2021-11-26 02:31:30,883 Sent ProduceRequest (v7, 1818 bytes @ 0, CorrId 2757) > 2021-11-26 02:31:30,887 Sent ProduceRequest (v7, 1655 bytes @ 0, CorrId 2758) > 2021-11-26 02:31:30,888 Received ProduceResponse (v7, 69 bytes, CorrId 2755, > rtt 318.85ms) > 2021-11-26 02:31:30,893 Sent partial ProduceRequest (v7, 0+16527/37562 > bytes, CorrId 2759) > 2021-11-26 02:31:30,894 Sent partial ProduceRequest (v7, 16527+16384/37562 > bytes, CorrId 2759) > 2021-11-26 02:31:30,895 Sent ProduceRequest (v7, 37562 bytes @ 32911, CorrId > 2759) > 2021-11-26 02:31:30,896 Sent ProduceRequest (v7, 4700 bytes @ 0, CorrId 2760) > 2021-11-26 02:31:30,897 Received ProduceResponse (v7, 69 bytes, CorrId 2756, > rtt 317.74ms) > 2021-11-26 02:31:30,897 Received ProduceResponse (v7, 69 bytes, CorrId 2757, > rtt 4.22ms) > 2021-11-26 02:31:30,899 Received ProduceResponse (v7, 69 bytes, CorrId 2758, > rtt 2.61ms){code} > > The requests of CorrId 2753 and 2754 are almost sent at the same time, but > the Response of 2754 is delayed for ~300ms. > We checked the logs on the broker. > > {code:java} > [2021-11-26 02:31:30,873] DEBUG Completed > request:RequestHeader(apiKey=PRODUCE, apiVersion=7, clientId=rdkafka, > correlationId=2754) – {acks=1,timeout=80,numPartitions=1},response: > {responses=[\{topic=***,partition_responses=[{partition=32,error_code=0,base_offset=58625,log_append_time=-1,log_start_offset=49773}]} > ],throttle_time_ms=0} from connection > 10.10.44.59:9093-10.10.0.68:31183-66;totalTime:0.852,requestQueueTime:0.128,localTime:0.427,remoteTime:0.09,throttleTime:0,responseQueueTime:0.073,sendTime:0.131,securityProtocol:SASL_SSL,principal:User:***,listener:SASL_SSL,clientInformation:ClientInformation(softwareName=confluent-kafka-python, > softwareVersion=1.5.0-rdkafka-1.5.2) (kafka.request.logger) > {code} > > > It seems that the time cost on the server side is very small. What’s the > reason for the latency spikes? > We also did tcpdump at the server side and found the delay comes from the > server side. > The CorrId=2754’s request was received at 10:31:30.566172 and The > CorrId=2754’s response was sent at 10:31:30.873518. So, the server's > processing time for this request is about {*}873-566=307ms{*}. > wireshark shows the CorrId=2754 ProduceRequest's timestamp and request info. > !image-2021-12-21-14-45-22-716.png! > wireshark shows the CorrId=2754 ProduceResponse's timestamp and response info. > !image-2021-12-21-14-44-56-689.png! > > We checked the source code and found the problems. The broker’s processor’s > run loop is as following: > !image-5.png|width=1001,height=449! > Look at the poll function, you can see the {*}poll timeout value is 300ms{*}. > {code:java} > private def poll(): Unit = { > val pollTimeout = if (newConnections.isEmpty) 300 else 0 > try selector.poll(pollTimeout) > catch { > case e @ (_: IllegalStateException | _: IOException) => > // The exception is not re-thrown and any completed > sends/receives/connections/disconnections > // from this poll will be processed. > error(s"Processor $id poll failed", e) > } > }{code} > > The following is the selector.poll function: > !image-6.png! > So, we may encounter the following situation: > * The first run in the loop. > ** poll -> received request ->processCompletedReceives -> request to queue. > * The second run in the loop. > ** processNewResponse-> ResponseQueue is empty(IO thread is processing the > request) -> poll() -> select(timeout=0) -> > {color:#172b4d}madeReadProgressLastCall = false{color} > * The third run in the loop. > ** processNewResponse -> ResponseQueue is *NOT* empty -> poll() -> > select(timeout=300) immediately return, because the response data is already, > the fd has been added to write_fd sets. > * The fourth run in the loop. > ** ResponseQueue is empty() -> poll() -> select(timeout=300) wait for 300ms > or new data arrives. > The server may receive server produce requests at one time but can only > handle one request each time, after the previous response sending finished, > then it can handle the next request. When the previous request is in > handling status, the other requests are saved in the cache. So, if the first > response was sent and at that time no new data arrived, the saved request may > be delayed for 300ms to process. > {color:#ff0000}*We suggest changing the poll timeout value from 300 to > 10.*{color} > *The following two figures show the comparisons of Request-Response RTT > value.* > *!image-3.png!* > RTT values when poll timeout value = 300 > > !image-2.png! > RTT values when poll timeout value = 10 > > Another problem, why does the server's log show the total time is very small? > Because the start time is set in the following function > processCompletedReceives, yet when the request is saved at the cache, the > timer doesn't start, so the totalTime doesn't include the time in the cache. > !image-7.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)