Jiangjie Qin created KAFKA-1905:
-----------------------------------

             Summary: KafkaProducer's performance could be halved when 
MaxInFlightRequest is set to 1
                 Key: KAFKA-1905
                 URL: https://issues.apache.org/jira/browse/KAFKA-1905
             Project: Kafka
          Issue Type: Bug
            Reporter: Jiangjie Qin


In KafkaProducer, the following logic is used in each poll():
1. Get a list of nodes who has a batch available for sending
2. Filter the list to remove the node which is not ready to receive a new 
request (MaxInFlightRequests is checked here) 
3. Compose the requests for the nodes in the filtered list, i.e. has a batch to 
send and also ready to receive.
4. Increase InFlightRequests, send the requests and get the responses of 
previous send.
5. handle all receives and decrease the inFlightRequests.

In this case, when MaxInFlightRequest is set to 1, since we are checking the 
InFlightRequests before each receive, even if we have already received the 
response, the node will still be considered not ready. So for a sequence of 
poll, we end up in the PollForSend - PollForReceive - PollForSend... pattern. 
Which essentially halved the throughput in a fast network. Ideally we should 
check whether node is ready after we check all the receives.

Here are the some logs that shows this situation when I run 
kafka-producer-perf-test locally.
-----1st poll for send, no receive------
[2015-01-28 13:54:06,009] INFO Nodes with data ready to send: [Node(0, 
jqin-ld1.linkedin.biz, 9092)] 
(org.apache.kafka.clients.producer.internals.Sender)
[2015-01-28 13:54:06,009] INFO Created 1 produce requests: 
[ClientRequest(expectResponse=true, 
payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0,
 recordCount=15)}, 
request=RequestSend(header={api_key=0,api_version=0,correlation_id=1074,client_id=producer-performance},
 
body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
 lim=15780 cap=16384]}]}]}))] 
(org.apache.kafka.clients.producer.internals.Sender)
[2015-01-28 13:54:06,009] INFO responses #: 0 
(org.apache.kafka.clients.producer.internals.Sender)
------ 2nd poll for receive, no send------
[2015-01-28 13:54:06,009] INFO No ready nodes, timeout = 9223372036854775807 
(org.apache.kafka.clients.producer.internals.Sender)
[2015-01-28 13:54:06,009] INFO responses #: 1 
(org.apache.kafka.clients.producer.internals.Sender)
------ 3rd poll for send, no receive------
[2015-01-28 13:54:06,010] INFO Nodes with data ready to send: [Node(0, 
jqin-ld1.linkedin.biz, 9092)] 
(org.apache.kafka.clients.producer.internals.Sender)
[2015-01-28 13:54:06,010] INFO Created 1 produce requests: 
[ClientRequest(expectResponse=true, 
payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0,
 recordCount=15)}, 
request=RequestSend(header={api_key=0,api_version=0,correlation_id=1075,client_id=producer-performance},
 
body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
 lim=15780 cap=16384]}]}]}))] 
(org.apache.kafka.clients.producer.internals.Sender)
[2015-01-28 13:54:06,010] INFO responses #: 0 
(org.apache.kafka.clients.producer.internals.Sender)
---- 4th poll for receive, no send----
[2015-01-28 13:54:06,010] INFO No ready nodes, timeout = 9223372036854775807 
(org.apache.kafka.clients.producer.internals.Sender)
[2015-01-28 13:54:06,010] INFO responses #: 1 
(org.apache.kafka.clients.producer.internals.Sender)
---- 5th poll for send, no receive----
[2015-01-28 13:54:06,011] INFO Nodes with data ready to send: [Node(0, 
jqin-ld1.linkedin.biz, 9092)] 
(org.apache.kafka.clients.producer.internals.Sender)
[2015-01-28 13:54:06,011] INFO Created 1 produce requests: 
[ClientRequest(expectResponse=true, 
payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0,
 recordCount=15)}, 
request=RequestSend(header={api_key=0,api_version=0,correlation_id=1076,client_id=producer-performance},
 
body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
 lim=15780 cap=16384]}]}]}))] 
(org.apache.kafka.clients.producer.internals.Sender)
[2015-01-28 13:54:06,011] INFO responses #: 0 
(org.apache.kafka.clients.producer.internals.Sender)
---- 6th poll for receive, no send-----
[2015-01-28 13:54:06,011] INFO No ready nodes, timeout = 9223372036854775807 
(org.apache.kafka.clients.producer.internals.Sender)
.........



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to