[ 
https://issues.apache.org/jira/browse/KAFKA-13829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun updated KAFKA-13829:
-----------------------------
    Description: 
Due to the implementation mechanism of the `OP_WRITE` event of Kafka's 
underlying NIO, the function of the `max.in.flight.requests.per.connection` 
parameter does not work. This will greatly affect Kafka's network sending 
performance.



The process of Kafka's Selector sending ClientRequest can be simply divided 
into the following two major steps:
h2. 1) Prepare the request data to be sent

1. NetworkClient.ready(Node node, long now) -> 
NetworkClient.canSendRequest(...) method is called: determine whether the 
request is eligible to be sent.

2.NetworkClient.doSend(...): Construct `Send send` and cache the request by 
inFlightRequests.add(inFlightRequest).

3. Execute the `KafkaChannel.setSend()` method:
Judging that `this.send` *must be null* at present, otherwise {*}an 
IllegalStateException is thrown{*}; 
Set `this.send` value;
transportLayer adds `OP_WRITE` event.

 
h2. 2) Selector.poll(long timeout) should then be called to consume the send 
and send data to the network.

 
{code:java}
Selector.poll() -> this.nioSelector.select(timeout)
Selector.pollSelectionKeys() -> 
Selector.attemptWrite() -> 
Selector.write(channel) -> 
KafkaChannel.write() & KafkaChannel.maybeCompleteSend(){code}
 

1.Selector.poll -> this.nioSelector.select(timeout) : Get the previously 
registered `OP_WRITE` event

2. Execute the `Selector.attemptWrite()` method

3. In the `KafkaChannel.write()` method, after the data is successfully sent, 
update `this.send` to the completed state.

4. In the `KafkaChannel.maybeCompleteSend()` method, check whether send is in 
the completed state, otherwise do nothing.

5. If send is completed, transportLayer removes the `OP_WRITE` event and resets 
the KafkaChannel.send object to null.

6. Wait for the next request data that is ready to be sent.

 

It seems that there is no problem with the whole process above, but carefully 
read the method of NetworkClient.canSendRequest(...), there is such a condition 
in inFlightRequests.canSendMore(node):
{code:java}
queue.peekFirst().send.completed(){code}
Secondly, the `inFlightRequests.add(inFlightRequest)` method also calls 
*addFirst(request).*

 

Currently, *only one send object* is stored in KafkaChannel, not a sendObject 
{*}collection{*};
During OP_WRITE event registration and removal,  *only* *one* *send* *object*  
*will be sent* in the KafkaChannel.write() method and *only one send* *object*  
*will be completed* in the KafkaChannel.maybeCompleteSend() method.

So whether the clientRequest is eligible to be sent will {color:#ff0000}*only 
be limited by queue.peekFirst().send.completed()*{color}, the 
{color:#ff0000}*max.in.flight.requests.per.connection*{color} parameter will 
lose its effect, and {*}{color:#ff0000}the effect of setting greater than 1 is 
equivalent in 1{color}{*}.
h2.  

Suggest

Due to the KafkaClient architecture, we do not need to consider the concurrent 
execution of multiple threads of the `NetworkClient.poll` method.

1.NetworkClient.canSendRequest(...) removes the condition,: 
{code:java}
queue.peekFirst().send.completed(){code}
`max.in.flight.requests.per.connection` parameter will work again.

2. Before, in KafkaChannel.setSend(), register the `OP_WRITE` event, and in 
KafkaChannel.maybeCompleteSend(), remove the `OP_WRITE` event.
It may no longer be appropriate now. Because we want to send more than one send 
object data during the registration and removal of `OP_WRITE` events, it is 
recommended {*}not to repeatedly register and remove `OP_WRITE` events{*}, but 
choose to {*}register the `OP_WRITE` event in the 
`transportLayer.finishConnect()` method{*}:
{code:java}
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | 
SelectionKey.OP_READ | SelectionKey.OP_WRITE);{code}
3. The KafkaChannel.setSend() method does not only cache *an* incoming send 
object, but the incoming send object is stored in the *sendCollection* 
structure.

4. When KafkaChannel.write() is executed, copy sendCollection to 
{*}midWriteSendCollection{*}, then clear sendCollection, and finally send all 
the data in {*}midWriteSendCollection{*}.

5. In the KafkaChannel.maybeCompleteSend() method, determine whether there is a 
completed state send in the midWriteSendCollection, and then remove all 
completedSends from the midWriteSendCollection, and return completedSends for 
adding into Selector.completedSends.

6. Selector.attemptRead(channel) method execution has preconditions: 
`{*}!hasCompletedReceive(channel){*}`, so the KafkaChannel.receive object does 
not need to be changed.

7. A little extra:
ConsumerNetworkClient.poll(Timer timer, PollCondition pollCondition, boolean 
disableWakeup) method.
There are three places involved in sending data in this method:
{code:java}
long pollDelayMs = trySend(timer.currentTimeMs());
->
 client.poll(...)
->
trySend(timer.currentTimeMs());{code}
There is a problem with this process: 
when calling the trySend(...) method for the second time, we should immediately 
call client.poll(0, timer.currentTimeMs()); , to ensure that the send generated 
each time can be consumed by the next Selector.poll() method.

  was:
Due to the implementation mechanism of the `OP_WRITE` event of Kafka's 
underlying NIO, the function of the `max.in.flight.requests.per.connection` 
parameter does not work. This will greatly affect Kafka's network sending 
performance.


The process of Kafka's Selector sending ClientRequest can be simply divided 
into the following two major steps:
h2. 
1) Prepare the request data to be sent


1. NetworkClient.ready(Node node, long now) -> 
NetworkClient.canSendRequest(...) method is called: determine whether the 
request is eligible to be sent.

2.NetworkClient.doSend(...): Construct `Send send` and cache the request by 
inFlightRequests.add(inFlightRequest).

3. Execute the `KafkaChannel.setSend()` method:
Judging that `this.send` *must be null* at present, otherwise {*}an 
IllegalStateException is thrown{*}; 
Set `this.send` value;
transportLayer adds `OP_WRITE` event.

 
h2. 2) Selector.poll(long timeout) should then be called to consume the send 
and send data to the network.

 
{code:java}
Selector.poll() -> this.nioSelector.select(timeout)
Selector.pollSelectionKeys() -> 
Selector.attemptWrite() -> 
Selector.write(channel) -> 
KafkaChannel.write() & KafkaChannel.maybeCompleteSend(){code}
 

1.Selector.poll -> this.nioSelector.select(timeout) : Get the previously 
registered `OP_WRITE` event

2. Execute the `Selector.attemptWrite()` method

3. In the `KafkaChannel.write()` method, after the data is successfully sent, 
update `this.send` to the completed state.

4. In the `KafkaChannel.maybeCompleteSend()` method, check whether send is in 
the completed state, otherwise do nothing.

5. If send is completed, transportLayer removes the `OP_WRITE` event and resets 
the KafkaChannel.send object to null.

6. Wait for the next request data that is ready to be sent.

 

It seems that there is no problem with the whole process above, but carefully 
read the method of NetworkClient.canSendRequest(...), there is such a condition 
in inFlightRequests.canSendMore(node): 
{code:java}
queue.peekFirst().send.completed(){code}

Secondly, the `inFlightRequests.add(inFlightRequest)` method also calls 
*addFirst(request).*

 

Currently, *only one send object* is stored in KafkaChannel, not a sendObject 
{*}collection{*};
During OP_WRITE event registration and removal,  *only* *one* *send* *object*  
*will be sent* in the KafkaChannel.write() method and *only one send* *object*  
*will be completed* in the KafkaChannel.maybeCompleteSend() method.

So whether the clientRequest is eligible to be sent will {color:#FF0000}*only 
be limited by queue.peekFirst().send.completed()*{color}, the 
{color:#FF0000}*max.in.flight.requests.per.connection*{color} parameter will 
lose its effect, and {*}{color:#FF0000}the effect of setting greater than 1 is 
equivalent in 1{color}{*}.
h2. 
Suggest


Due to the KafkaClient architecture, we do not need to consider the concurrent 
execution of multiple threads of the `NetworkClient.poll` method.

1.NetworkClient.canSendRequest(...) removes the condition,: 
{code:java}
queue.peekFirst().send.completed(){code}
`max.in.flight.requests.per.connection` parameter will work again.



2. Before, in KafkaChannel.setSend(), register the `OP_WRITE` event, and in 
KafkaChannel.maybeCompleteSend(), remove the `OP_WRITE` event.
It may no longer be appropriate now. Because we want to send more than one send 
object data during the registration and removal of `OP_WRITE` events, it is 
recommended {*}not to repeatedly register and remove `OP_WRITE` events{*}, but 
choose to {*}register the `OP_WRITE` event in the 
`transportLayer.finishConnect()` method{*}:
{code:java}
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | 
SelectionKey.OP_READ | SelectionKey.OP_WRITE);{code}

3. The KafkaChannel.setSend() method does not only cache *an* incoming send 
object, but the incoming send object is stored in the *sendCollection* 
structure.

4. When KafkaChannel.write() is executed, copy sendCollection to 
{*}midWriteSendCollection{*}, then clear sendCollection, and finally send all 
the data in {*}midWriteSendCollection{*}.

5. In the KafkaChannel.maybeCompleteSend() method, determine whether there is a 
completed state send in the midWriteSendCollection, and then remove all 
completedSends from the midWriteSendCollection, and return completedSends for 
adding into Selector.completedSends.

6. Selector.attemptRead(channel) method execution has preconditions: 
`{*}!hasCompletedReceive(channel){*}`, so the KafkaChannel.receive object does 
not need to be changed.

7. A little extra:
ConsumerNetworkClient.poll(Timer timer, PollCondition pollCondition, boolean 
disableWakeup) method.
There are three places involved in sending data in this method:
{code:java}
long pollDelayMs = trySend(timer.currentTimeMs());
->
 client.poll(...)
->
trySend(timer.currentTimeMs());{code}
There is a problem with this process: 
when calling the trySend(...) method for the second time, we should immediately 
call client.poll(0, timer.currentTimeMs()); , to ensure that the send generated 
each time can be consumed by the next Selector.poll() method.


> The function of max.in.flight.requests.per.connection parameter does not 
> work, it conflicts with the underlying NIO sending data
> --------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13829
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13829
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>            Reporter: RivenSun
>            Priority: Major
>
> Due to the implementation mechanism of the `OP_WRITE` event of Kafka's 
> underlying NIO, the function of the `max.in.flight.requests.per.connection` 
> parameter does not work. This will greatly affect Kafka's network sending 
> performance.
> The process of Kafka's Selector sending ClientRequest can be simply divided 
> into the following two major steps:
> h2. 1) Prepare the request data to be sent
> 1. NetworkClient.ready(Node node, long now) -> 
> NetworkClient.canSendRequest(...) method is called: determine whether the 
> request is eligible to be sent.
> 2.NetworkClient.doSend(...): Construct `Send send` and cache the request by 
> inFlightRequests.add(inFlightRequest).
> 3. Execute the `KafkaChannel.setSend()` method:
> Judging that `this.send` *must be null* at present, otherwise {*}an 
> IllegalStateException is thrown{*}; 
> Set `this.send` value;
> transportLayer adds `OP_WRITE` event.
>  
> h2. 2) Selector.poll(long timeout) should then be called to consume the send 
> and send data to the network.
>  
> {code:java}
> Selector.poll() -> this.nioSelector.select(timeout)
> Selector.pollSelectionKeys() -> 
> Selector.attemptWrite() -> 
> Selector.write(channel) -> 
> KafkaChannel.write() & KafkaChannel.maybeCompleteSend(){code}
>  
> 1.Selector.poll -> this.nioSelector.select(timeout) : Get the previously 
> registered `OP_WRITE` event
> 2. Execute the `Selector.attemptWrite()` method
> 3. In the `KafkaChannel.write()` method, after the data is successfully sent, 
> update `this.send` to the completed state.
> 4. In the `KafkaChannel.maybeCompleteSend()` method, check whether send is in 
> the completed state, otherwise do nothing.
> 5. If send is completed, transportLayer removes the `OP_WRITE` event and 
> resets the KafkaChannel.send object to null.
> 6. Wait for the next request data that is ready to be sent.
>  
> It seems that there is no problem with the whole process above, but carefully 
> read the method of NetworkClient.canSendRequest(...), there is such a 
> condition in inFlightRequests.canSendMore(node):
> {code:java}
> queue.peekFirst().send.completed(){code}
> Secondly, the `inFlightRequests.add(inFlightRequest)` method also calls 
> *addFirst(request).*
>  
> Currently, *only one send object* is stored in KafkaChannel, not a sendObject 
> {*}collection{*};
> During OP_WRITE event registration and removal,  *only* *one* *send* *object* 
>  *will be sent* in the KafkaChannel.write() method and *only one send* 
> *object*  *will be completed* in the KafkaChannel.maybeCompleteSend() method.
> So whether the clientRequest is eligible to be sent will {color:#ff0000}*only 
> be limited by queue.peekFirst().send.completed()*{color}, the 
> {color:#ff0000}*max.in.flight.requests.per.connection*{color} parameter will 
> lose its effect, and {*}{color:#ff0000}the effect of setting greater than 1 
> is equivalent in 1{color}{*}.
> h2.  
> Suggest
> Due to the KafkaClient architecture, we do not need to consider the 
> concurrent execution of multiple threads of the `NetworkClient.poll` method.
> 1.NetworkClient.canSendRequest(...) removes the condition,: 
> {code:java}
> queue.peekFirst().send.completed(){code}
> `max.in.flight.requests.per.connection` parameter will work again.
> 2. Before, in KafkaChannel.setSend(), register the `OP_WRITE` event, and in 
> KafkaChannel.maybeCompleteSend(), remove the `OP_WRITE` event.
> It may no longer be appropriate now. Because we want to send more than one 
> send object data during the registration and removal of `OP_WRITE` events, it 
> is recommended {*}not to repeatedly register and remove `OP_WRITE` events{*}, 
> but choose to {*}register the `OP_WRITE` event in the 
> `transportLayer.finishConnect()` method{*}:
> {code:java}
> key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | 
> SelectionKey.OP_READ | SelectionKey.OP_WRITE);{code}
> 3. The KafkaChannel.setSend() method does not only cache *an* incoming send 
> object, but the incoming send object is stored in the *sendCollection* 
> structure.
> 4. When KafkaChannel.write() is executed, copy sendCollection to 
> {*}midWriteSendCollection{*}, then clear sendCollection, and finally send all 
> the data in {*}midWriteSendCollection{*}.
> 5. In the KafkaChannel.maybeCompleteSend() method, determine whether there is 
> a completed state send in the midWriteSendCollection, and then remove all 
> completedSends from the midWriteSendCollection, and return completedSends for 
> adding into Selector.completedSends.
> 6. Selector.attemptRead(channel) method execution has preconditions: 
> `{*}!hasCompletedReceive(channel){*}`, so the KafkaChannel.receive object 
> does not need to be changed.
> 7. A little extra:
> ConsumerNetworkClient.poll(Timer timer, PollCondition pollCondition, boolean 
> disableWakeup) method.
> There are three places involved in sending data in this method:
> {code:java}
> long pollDelayMs = trySend(timer.currentTimeMs());
> ->
>  client.poll(...)
> ->
> trySend(timer.currentTimeMs());{code}
> There is a problem with this process: 
> when calling the trySend(...) method for the second time, we should 
> immediately call client.poll(0, timer.currentTimeMs()); , to ensure that the 
> send generated each time can be consumed by the next Selector.poll() method.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to