Jiangjie Qin created KAFKA-1905: ----------------------------------- Summary: KafkaProducer's performance could be halved when MaxInFlightRequest is set to 1 Key: KAFKA-1905 URL: https://issues.apache.org/jira/browse/KAFKA-1905 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin
In KafkaProducer, the following logic is used in each poll(): 1. Get a list of nodes who has a batch available for sending 2. Filter the list to remove the node which is not ready to receive a new request (MaxInFlightRequests is checked here) 3. Compose the requests for the nodes in the filtered list, i.e. has a batch to send and also ready to receive. 4. Increase InFlightRequests, send the requests and get the responses of previous send. 5. handle all receives and decrease the inFlightRequests. In this case, when MaxInFlightRequest is set to 1, since we are checking the InFlightRequests before each receive, even if we have already received the response, the node will still be considered not ready. So for a sequence of poll, we end up in the PollForSend - PollForReceive - PollForSend... pattern. Which essentially halved the throughput in a fast network. Ideally we should check whether node is ready after we check all the receives. Here are the some logs that shows this situation when I run kafka-producer-perf-test locally. -----1st poll for send, no receive------ [2015-01-28 13:54:06,009] INFO Nodes with data ready to send: [Node(0, jqin-ld1.linkedin.biz, 9092)] (org.apache.kafka.clients.producer.internals.Sender) [2015-01-28 13:54:06,009] INFO Created 1 produce requests: [ClientRequest(expectResponse=true, payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0, recordCount=15)}, request=RequestSend(header={api_key=0,api_version=0,correlation_id=1074,client_id=producer-performance}, body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0 lim=15780 cap=16384]}]}]}))] (org.apache.kafka.clients.producer.internals.Sender) [2015-01-28 13:54:06,009] INFO responses #: 0 (org.apache.kafka.clients.producer.internals.Sender) ------ 2nd poll for receive, no send------ [2015-01-28 13:54:06,009] INFO No ready nodes, timeout = 9223372036854775807 (org.apache.kafka.clients.producer.internals.Sender) [2015-01-28 13:54:06,009] INFO responses #: 1 (org.apache.kafka.clients.producer.internals.Sender) ------ 3rd poll for send, no receive------ [2015-01-28 13:54:06,010] INFO Nodes with data ready to send: [Node(0, jqin-ld1.linkedin.biz, 9092)] (org.apache.kafka.clients.producer.internals.Sender) [2015-01-28 13:54:06,010] INFO Created 1 produce requests: [ClientRequest(expectResponse=true, payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0, recordCount=15)}, request=RequestSend(header={api_key=0,api_version=0,correlation_id=1075,client_id=producer-performance}, body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0 lim=15780 cap=16384]}]}]}))] (org.apache.kafka.clients.producer.internals.Sender) [2015-01-28 13:54:06,010] INFO responses #: 0 (org.apache.kafka.clients.producer.internals.Sender) ---- 4th poll for receive, no send---- [2015-01-28 13:54:06,010] INFO No ready nodes, timeout = 9223372036854775807 (org.apache.kafka.clients.producer.internals.Sender) [2015-01-28 13:54:06,010] INFO responses #: 1 (org.apache.kafka.clients.producer.internals.Sender) ---- 5th poll for send, no receive---- [2015-01-28 13:54:06,011] INFO Nodes with data ready to send: [Node(0, jqin-ld1.linkedin.biz, 9092)] (org.apache.kafka.clients.producer.internals.Sender) [2015-01-28 13:54:06,011] INFO Created 1 produce requests: [ClientRequest(expectResponse=true, payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0, recordCount=15)}, request=RequestSend(header={api_key=0,api_version=0,correlation_id=1076,client_id=producer-performance}, body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0 lim=15780 cap=16384]}]}]}))] (org.apache.kafka.clients.producer.internals.Sender) [2015-01-28 13:54:06,011] INFO responses #: 0 (org.apache.kafka.clients.producer.internals.Sender) ---- 6th poll for receive, no send----- [2015-01-28 13:54:06,011] INFO No ready nodes, timeout = 9223372036854775807 (org.apache.kafka.clients.producer.internals.Sender) ......... -- This message was sent by Atlassian JIRA (v6.3.4#6332)