[ https://issues.apache.org/jira/browse/KAFKA-13829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17522182#comment-17522182 ]
Luke Chen commented on KAFKA-13829: ----------------------------------- [~RivenSun] , thanks for reporting it. I saw there's a test `NetworkClientTest#testDisconnectWithMultipleInFlights` testing the inflightReqeust count. Could you please check it and see if it is what you expected? If no, I'd suggest you submit a PR with one test to confirm there's a bug there, which will be much clear than explaining in words. After all, you need a test to test the fix anyway. Thanks. > The function of max.in.flight.requests.per.connection parameter does not > work, it conflicts with the underlying NIO sending data > -------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-13829 > URL: https://issues.apache.org/jira/browse/KAFKA-13829 > Project: Kafka > Issue Type: Bug > Components: clients > Reporter: RivenSun > Priority: Major > > Due to the implementation mechanism of the `OP_WRITE` event of Kafka's > underlying NIO, the function of the `max.in.flight.requests.per.connection` > parameter does not work. This will greatly affect Kafka's network sending > performance. > The process of Kafka's Selector sending ClientRequest can be simply divided > into the following two major steps: > h2. 1) Prepare the request data to be sent > 1. NetworkClient.ready(Node node, long now) -> > NetworkClient.canSendRequest(...) method is called: determine whether the > request is eligible to be sent. > 2.NetworkClient.doSend(...): Construct `Send send` and cache the request by > inFlightRequests.add(inFlightRequest). > 3. Execute the `KafkaChannel.setSend()` method: > Judging that `this.send` *must be null* at present, otherwise {*}an > IllegalStateException is thrown{*}; > Set `this.send` value; > transportLayer adds `OP_WRITE` event. > > h2. 2) Selector.poll(long timeout) should then be called to consume the send > and send data to the network. > > {code:java} > Selector.poll() -> this.nioSelector.select(timeout) > Selector.pollSelectionKeys() -> > Selector.attemptWrite() -> > Selector.write(channel) -> > KafkaChannel.write() & KafkaChannel.maybeCompleteSend(){code} > > 1.Selector.poll -> this.nioSelector.select(timeout) : Get the previously > registered `OP_WRITE` event > 2. Execute the `Selector.attemptWrite()` method > 3. In the `KafkaChannel.write()` method, after the data is successfully sent, > update `this.send` to the completed state. > 4. In the `KafkaChannel.maybeCompleteSend()` method, check whether send is in > the completed state, otherwise do nothing. > 5. If send is completed, transportLayer removes the `OP_WRITE` event and > resets the KafkaChannel.send object to null. > 6. Wait for the next request data that is ready to be sent. > > It seems that there is no problem with the whole process above, but carefully > read the method of NetworkClient.canSendRequest(...), there is such a > condition in inFlightRequests.canSendMore(node): > {code:java} > queue.peekFirst().send.completed(){code} > Secondly, the `inFlightRequests.add(inFlightRequest)` method also calls > *addFirst(request).* > > Currently, *only one send object* is stored in KafkaChannel, not a sendObject > {*}collection{*}; > During OP_WRITE event registration and removal, *only* *one* *send* *object* > *will be sent* in the KafkaChannel.write() method and *only one send* > *object* *will be completed* in the KafkaChannel.maybeCompleteSend() method. > So whether the clientRequest is eligible to be sent will {color:#ff0000}*only > be limited by queue.peekFirst().send.completed()*{color}, the > {color:#ff0000}*max.in.flight.requests.per.connection*{color} parameter will > lose its effect, and {*}{color:#ff0000}the effect of setting greater than 1 > is equivalent in 1{color}{*}. > h2. > Suggest > Due to the KafkaClient architecture, we do not need to consider the > concurrent execution of multiple threads of the `NetworkClient.poll` method. > 1.NetworkClient.canSendRequest(...) removes the condition,: > {code:java} > queue.peekFirst().send.completed(){code} > `max.in.flight.requests.per.connection` parameter will work again. > 2. Before, in KafkaChannel.setSend(), register the `OP_WRITE` event, and in > KafkaChannel.maybeCompleteSend(), remove the `OP_WRITE` event. > It may no longer be appropriate now. Because we want to send more than one > send object data during the registration and removal of `OP_WRITE` events, it > is recommended {*}not to repeatedly register and remove `OP_WRITE` events{*}, > but choose to {*}register the `OP_WRITE` event in the > `transportLayer.finishConnect()` method{*}: > {code:java} > key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | > SelectionKey.OP_READ | SelectionKey.OP_WRITE);{code} > 3. The KafkaChannel.setSend() method does not only cache *an* incoming send > object, but the incoming send object is stored in the *sendCollection* > structure. > 4. When KafkaChannel.write() is executed, copy sendCollection to > {*}midWriteSendCollection{*}, then clear sendCollection, and finally send all > the data in {*}midWriteSendCollection{*}. > 5. In the KafkaChannel.maybeCompleteSend() method, determine whether there is > a completed state send in the midWriteSendCollection, and then remove all > completedSends from the midWriteSendCollection, and return completedSends for > adding into Selector.completedSends. > 6. Selector.attemptRead(channel) method execution has preconditions: > `{*}!hasCompletedReceive(channel){*}`, so the KafkaChannel.receive object > does not need to be changed. > 7. A little extra: > ConsumerNetworkClient.poll(Timer timer, PollCondition pollCondition, boolean > disableWakeup) method. > There are three places involved in sending data in this method: > {code:java} > long pollDelayMs = trySend(timer.currentTimeMs()); > -> > client.poll(...) > -> > trySend(timer.currentTimeMs());{code} > There is a problem with this process: > when calling the trySend(...) method for the second time, we should > immediately call client.poll(0, timer.currentTimeMs()); , to ensure that the > send generated each time can be consumed by the next Selector.poll() method. -- This message was sent by Atlassian Jira (v8.20.1#820001)