[ https://issues.apache.org/jira/browse/KAFKA-13838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17525168#comment-17525168 ]
Guozhang Wang commented on KAFKA-13838: --------------------------------------- cc [~hachikuji] as you are working on the threading refactoring of the consumer, and may piggy-back the consumer network polling mechanism. > Improve the poll method of ConsumerNetworkClient > ------------------------------------------------ > > Key: KAFKA-13838 > URL: https://issues.apache.org/jira/browse/KAFKA-13838 > Project: Kafka > Issue Type: Improvement > Components: consumer > Reporter: RivenSun > Assignee: RivenSun > Priority: Major > > Briefly describe the process of sending clientRequest on the Kafka Client > side, which is divided into two steps. > 1.Selector.send(send) method > Kafka's underlying tcp connection channel ensures that data is sent to the > network {*}sequentially{*}. KafkaChannel allows {*}only one send to be set at > a time{*}. And the next InFlightRequest is allowed to be added only if the > {color:#ff0000}queue.peekFirst().send.completed(){color} condition is met. > {code:java} > NetworkClient.isReady(node) -> > NetworkClient.canSendRequest(node) -> > InFlightRequests.canSendMore(node){code} > 2. Selector.poll(timeout) > After KafkaChannel sets a send each time, there should be a > Selector.poll(timeout) call {*}subsequently{*}. Please refer to the comments > on the Selector.send(send) method. > {code:java} > /** > * Queue the given request for sending in the subsequent {@link #poll(long)} > calls > * @param send The request to send > */ > public void send(NetworkSend send) { {code} > Send may become *completed* *only after* the Selector.poll(timeout) method is > executed, more detail see Selector.write(channel) methos. > > Let's go back and look at this method: ConsumerNetworkClient.poll(Timer > timer, PollCondition pollCondition, boolean disableWakeup) method. > There are three places involved in sending data in this method: > {code:java} > long pollDelayMs = trySend(timer.currentTimeMs()); > -> > client.poll(...) > -> > trySend(timer.currentTimeMs()); > {code} > There are two problems with this process: > 1. After calling the trySend(...) method for the second time, we should > immediately call client.poll(0, timer.currentTimeMs()); , to ensure that the > send generated each time can be consumed by the next Selector.poll() method. > 2. The while loop in trySend(...) method can be removed > After a node executes client.send(request, now) for the first time, because > the first request will never be *completed* here, the subsequent requests > will never satisfy the client.ready(node, now) condition. > Although the current code will break directly on the second execution of the > loop, there will be {*}an additional execution of the loop{*}. > Modify the code as follows: > {code:java} > long trySend(long now) { > long pollDelayMs = maxPollTimeoutMs; > // send any requests that can be sent now > for (Node node : unsent.nodes()) { > Iterator<ClientRequest> iterator = unsent.requestIterator(node); > if (iterator.hasNext()) { > pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, > now)); > if (client.ready(node, now)) { > client.send(iterator.next(), now); > iterator.remove(); > } > } > } > return pollDelayMs; > }{code} > 3. By the way, the unsent.clean() method that is executed last can also be > optimized. > Easier to read the code. > {code:java} > public void clean() { > // the lock protects removal from a concurrent put which could otherwise > mutate the > // queue after it has been removed from the map > synchronized (unsent) { > unsent.values().removeIf(ConcurrentLinkedQueue::isEmpty); > } > } {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)