[ 
https://issues.apache.org/jira/browse/KAFKA-13838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17526071#comment-17526071
 ] 

Guozhang Wang commented on KAFKA-13838:
---------------------------------------

Also bringing to [~kirktrue]'s radar.

> Improve the poll method of ConsumerNetworkClient
> ------------------------------------------------
>
>                 Key: KAFKA-13838
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13838
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>            Reporter: RivenSun
>            Assignee: RivenSun
>            Priority: Major
>
> Briefly describe the process of sending clientRequest on the Kafka Client 
> side, which is divided into two steps.
> 1.Selector.send(send) method
> Kafka's underlying tcp connection channel ensures that data is sent to the 
> network {*}sequentially{*}. KafkaChannel allows {*}only one send to be set at 
> a time{*}. And the next InFlightRequest is allowed to be added only if the 
> {color:#ff0000}queue.peekFirst().send.completed(){color} condition is met.
> {code:java}
> NetworkClient.isReady(node) ->
> NetworkClient.canSendRequest(node) -> 
> InFlightRequests.canSendMore(node){code}
> 2. Selector.poll(timeout)
> After KafkaChannel sets a send each time, there should be a 
> Selector.poll(timeout) call {*}subsequently{*}. Please refer to the comments 
> on the Selector.send(send) method.
> {code:java}
> /**
>  * Queue the given request for sending in the subsequent {@link #poll(long)} 
> calls
>  * @param send The request to send
>  */
> public void send(NetworkSend send) { {code}
> Send may become *completed* *only after* the Selector.poll(timeout) method is 
> executed, more detail see Selector.write(channel) methos.
>  
> Let's go back and look at this method: ConsumerNetworkClient.poll(Timer 
> timer, PollCondition pollCondition, boolean disableWakeup) method.
> There are three places involved in sending data in this method:
> {code:java}
> long pollDelayMs = trySend(timer.currentTimeMs());
> ->
>  client.poll(...)
> ->
> trySend(timer.currentTimeMs());
> {code}
> There are two problems with this process:
> 1. After calling the trySend(...) method for the second time, we should 
> immediately call client.poll(0, timer.currentTimeMs()); , to ensure that the 
> send generated each time can be consumed by the next Selector.poll() method.
> 2. The while loop in trySend(...) method can be removed
> After a node executes client.send(request, now) for the first time, because 
> the first request will never be *completed* here, the subsequent requests 
> will never satisfy the client.ready(node, now) condition.
> Although the current code will break directly on the second execution of the 
> loop, there will be {*}an additional execution of the loop{*}.
> Modify the code as follows:
> {code:java}
> long trySend(long now) {
>     long pollDelayMs = maxPollTimeoutMs;
>     // send any requests that can be sent now
>     for (Node node : unsent.nodes()) {
>         Iterator<ClientRequest> iterator = unsent.requestIterator(node);
>         if (iterator.hasNext()) {
>             pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, 
> now));
>             if (client.ready(node, now)) {
>                 client.send(iterator.next(), now);
>                 iterator.remove();
>             }
>         }
>     }
>     return pollDelayMs;
> }{code}
> 3. By the way, the unsent.clean() method that is executed last can also be 
> optimized.
> Easier to read the code.
> {code:java}
> public void clean() {
>     // the lock protects removal from a concurrent put which could otherwise 
> mutate the
>     // queue after it has been removed from the map
>     synchronized (unsent) {
>         unsent.values().removeIf(ConcurrentLinkedQueue::isEmpty);
>     }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to