[ 
https://issues.apache.org/jira/browse/KAFKA-13838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17524705#comment-17524705
 ] 

RivenSun commented on KAFKA-13838:
----------------------------------

After looking at the code carefully, the comments on the second trySend call.
`buffer space may have been cleared` should mean that the send in the 
KafkaChannel has completed and was reset to null.

In this case, we should then call networkClient.poll(...) again, otherwise the 
first trySend call will be meaningless the next time we enter 
ConsumerNetworkClient.poll(...).

> Improve the poll method of ConsumerNetworkClient
> ------------------------------------------------
>
>                 Key: KAFKA-13838
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13838
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>            Reporter: RivenSun
>            Assignee: RivenSun
>            Priority: Major
>
> Briefly describe the process of sending clientRequest on the Kafka Client 
> side, which is divided into two steps.
> 1.Selector.send(send) method
> Kafka's underlying tcp connection channel ensures that data is sent to the 
> network {*}sequentially{*}. KafkaChannel allows {*}only one send to be set at 
> a time{*}. And the next InFlightRequest is allowed to be added only if the 
> {color:#ff0000}queue.peekFirst().send.completed(){color} condition is met.
> {code:java}
> NetworkClient.isReady(node) ->
> NetworkClient.canSendRequest(node) -> 
> InFlightRequests.canSendMore(node){code}
> 2. Selector.poll(timeout)
> After KafkaChannel sets a send each time, there should be a 
> Selector.poll(timeout) call {*}subsequently{*}. Please refer to the comments 
> on the Selector.send(send) method.
> {code:java}
> /**
>  * Queue the given request for sending in the subsequent {@link #poll(long)} 
> calls
>  * @param send The request to send
>  */
> public void send(NetworkSend send) { {code}
> Send may become *completed* *only after* the Selector.poll(timeout) method is 
> executed, more detail see Selector.write(channel) methos.
>  
> Let's go back and look at this method: ConsumerNetworkClient.poll(Timer 
> timer, PollCondition pollCondition, boolean disableWakeup) method.
> There are three places involved in sending data in this method:
> {code:java}
> long pollDelayMs = trySend(timer.currentTimeMs());
> ->
>  client.poll(...)
> ->
> trySend(timer.currentTimeMs());
> {code}
> There are two problems with this process:
> 1. After calling the trySend(...) method for the second time, we should 
> immediately call client.poll(0, timer.currentTimeMs()); , to ensure that the 
> send generated each time can be consumed by the next Selector.poll() method.
> 2. The while loop in trySend(...) method can be removed
> After a node executes client.send(request, now) for the first time, because 
> the first request will never be *completed* here, the subsequent requests 
> will never satisfy the client.ready(node, now) condition.
> Although the current code will break directly on the second execution of the 
> loop, there will be {*}an additional execution of the loop{*}.
> Modify the code as follows:
> {code:java}
> long trySend(long now) {
>     long pollDelayMs = maxPollTimeoutMs;
>     // send any requests that can be sent now
>     for (Node node : unsent.nodes()) {
>         Iterator<ClientRequest> iterator = unsent.requestIterator(node);
>         if (iterator.hasNext()) {
>             pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, 
> now));
>             if (client.ready(node, now)) {
>                 client.send(iterator.next(), now);
>                 iterator.remove();
>             }
>         }
>     }
>     return pollDelayMs;
> }{code}
> 3. By the way, the unsent.clean() method that is executed last can also be 
> optimized.
> Easier to read the code.
> {code:java}
> public void clean() {
>     // the lock protects removal from a concurrent put which could otherwise 
> mutate the
>     // queue after it has been removed from the map
>     synchronized (unsent) {
>         unsent.values().removeIf(ConcurrentLinkedQueue::isEmpty);
>     }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to