[ 
https://issues.apache.org/jira/browse/KAFKA-13829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17522182#comment-17522182
 ] 

Luke Chen commented on KAFKA-13829:
-----------------------------------

[~RivenSun] , thanks for reporting it. I saw there's a test 
`NetworkClientTest#testDisconnectWithMultipleInFlights` testing the 
inflightReqeust count. Could you please check it and see if it is what you 
expected? If no, I'd suggest you submit a PR with one test to confirm there's a 
bug there, which will be much clear than explaining in words. After all, you 
need a test to test the fix anyway. Thanks.

> The function of max.in.flight.requests.per.connection parameter does not 
> work, it conflicts with the underlying NIO sending data
> --------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13829
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13829
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>            Reporter: RivenSun
>            Priority: Major
>
> Due to the implementation mechanism of the `OP_WRITE` event of Kafka's 
> underlying NIO, the function of the `max.in.flight.requests.per.connection` 
> parameter does not work. This will greatly affect Kafka's network sending 
> performance.
> The process of Kafka's Selector sending ClientRequest can be simply divided 
> into the following two major steps:
> h2. 1) Prepare the request data to be sent
> 1. NetworkClient.ready(Node node, long now) -> 
> NetworkClient.canSendRequest(...) method is called: determine whether the 
> request is eligible to be sent.
> 2.NetworkClient.doSend(...): Construct `Send send` and cache the request by 
> inFlightRequests.add(inFlightRequest).
> 3. Execute the `KafkaChannel.setSend()` method:
> Judging that `this.send` *must be null* at present, otherwise {*}an 
> IllegalStateException is thrown{*}; 
> Set `this.send` value;
> transportLayer adds `OP_WRITE` event.
>  
> h2. 2) Selector.poll(long timeout) should then be called to consume the send 
> and send data to the network.
>  
> {code:java}
> Selector.poll() -> this.nioSelector.select(timeout)
> Selector.pollSelectionKeys() -> 
> Selector.attemptWrite() -> 
> Selector.write(channel) -> 
> KafkaChannel.write() & KafkaChannel.maybeCompleteSend(){code}
>  
> 1.Selector.poll -> this.nioSelector.select(timeout) : Get the previously 
> registered `OP_WRITE` event
> 2. Execute the `Selector.attemptWrite()` method
> 3. In the `KafkaChannel.write()` method, after the data is successfully sent, 
> update `this.send` to the completed state.
> 4. In the `KafkaChannel.maybeCompleteSend()` method, check whether send is in 
> the completed state, otherwise do nothing.
> 5. If send is completed, transportLayer removes the `OP_WRITE` event and 
> resets the KafkaChannel.send object to null.
> 6. Wait for the next request data that is ready to be sent.
>  
> It seems that there is no problem with the whole process above, but carefully 
> read the method of NetworkClient.canSendRequest(...), there is such a 
> condition in inFlightRequests.canSendMore(node):
> {code:java}
> queue.peekFirst().send.completed(){code}
> Secondly, the `inFlightRequests.add(inFlightRequest)` method also calls 
> *addFirst(request).*
>  
> Currently, *only one send object* is stored in KafkaChannel, not a sendObject 
> {*}collection{*};
> During OP_WRITE event registration and removal,  *only* *one* *send* *object* 
>  *will be sent* in the KafkaChannel.write() method and *only one send* 
> *object*  *will be completed* in the KafkaChannel.maybeCompleteSend() method.
> So whether the clientRequest is eligible to be sent will {color:#ff0000}*only 
> be limited by queue.peekFirst().send.completed()*{color}, the 
> {color:#ff0000}*max.in.flight.requests.per.connection*{color} parameter will 
> lose its effect, and {*}{color:#ff0000}the effect of setting greater than 1 
> is equivalent in 1{color}{*}.
> h2.  
> Suggest
> Due to the KafkaClient architecture, we do not need to consider the 
> concurrent execution of multiple threads of the `NetworkClient.poll` method.
> 1.NetworkClient.canSendRequest(...) removes the condition,: 
> {code:java}
> queue.peekFirst().send.completed(){code}
> `max.in.flight.requests.per.connection` parameter will work again.
> 2. Before, in KafkaChannel.setSend(), register the `OP_WRITE` event, and in 
> KafkaChannel.maybeCompleteSend(), remove the `OP_WRITE` event.
> It may no longer be appropriate now. Because we want to send more than one 
> send object data during the registration and removal of `OP_WRITE` events, it 
> is recommended {*}not to repeatedly register and remove `OP_WRITE` events{*}, 
> but choose to {*}register the `OP_WRITE` event in the 
> `transportLayer.finishConnect()` method{*}:
> {code:java}
> key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | 
> SelectionKey.OP_READ | SelectionKey.OP_WRITE);{code}
> 3. The KafkaChannel.setSend() method does not only cache *an* incoming send 
> object, but the incoming send object is stored in the *sendCollection* 
> structure.
> 4. When KafkaChannel.write() is executed, copy sendCollection to 
> {*}midWriteSendCollection{*}, then clear sendCollection, and finally send all 
> the data in {*}midWriteSendCollection{*}.
> 5. In the KafkaChannel.maybeCompleteSend() method, determine whether there is 
> a completed state send in the midWriteSendCollection, and then remove all 
> completedSends from the midWriteSendCollection, and return completedSends for 
> adding into Selector.completedSends.
> 6. Selector.attemptRead(channel) method execution has preconditions: 
> `{*}!hasCompletedReceive(channel){*}`, so the KafkaChannel.receive object 
> does not need to be changed.
> 7. A little extra:
> ConsumerNetworkClient.poll(Timer timer, PollCondition pollCondition, boolean 
> disableWakeup) method.
> There are three places involved in sending data in this method:
> {code:java}
> long pollDelayMs = trySend(timer.currentTimeMs());
> ->
>  client.poll(...)
> ->
> trySend(timer.currentTimeMs());{code}
> There is a problem with this process: 
> when calling the trySend(...) method for the second time, we should 
> immediately call client.poll(0, timer.currentTimeMs()); , to ensure that the 
> send generated each time can be consumed by the next Selector.poll() method.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to