Jiangjie Qin created KAFKA-1905:
-----------------------------------
Summary: KafkaProducer's performance could be halved when
MaxInFlightRequest is set to 1
Key: KAFKA-1905
URL: https://issues.apache.org/jira/browse/KAFKA-1905
Project: Kafka
Issue Type: Bug
Reporter: Jiangjie Qin
In KafkaProducer, the following logic is used in each poll():
1. Get a list of nodes who has a batch available for sending
2. Filter the list to remove the node which is not ready to receive a new
request (MaxInFlightRequests is checked here)
3. Compose the requests for the nodes in the filtered list, i.e. has a batch to
send and also ready to receive.
4. Increase InFlightRequests, send the requests and get the responses of
previous send.
5. handle all receives and decrease the inFlightRequests.
In this case, when MaxInFlightRequest is set to 1, since we are checking the
InFlightRequests before each receive, even if we have already received the
response, the node will still be considered not ready. So for a sequence of
poll, we end up in the PollForSend - PollForReceive - PollForSend... pattern.
Which essentially halved the throughput in a fast network. Ideally we should
check whether node is ready after we check all the receives.
Here are the some logs that shows this situation when I run
kafka-producer-perf-test locally.
-----1st poll for send, no receive------
[2015-01-28 13:54:06,009] INFO Nodes with data ready to send: [Node(0,
jqin-ld1.linkedin.biz, 9092)]
(org.apache.kafka.clients.producer.internals.Sender)
[2015-01-28 13:54:06,009] INFO Created 1 produce requests:
[ClientRequest(expectResponse=true,
payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0,
recordCount=15)},
request=RequestSend(header={api_key=0,api_version=0,correlation_id=1074,client_id=producer-performance},
body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
lim=15780 cap=16384]}]}]}))]
(org.apache.kafka.clients.producer.internals.Sender)
[2015-01-28 13:54:06,009] INFO responses #: 0
(org.apache.kafka.clients.producer.internals.Sender)
------ 2nd poll for receive, no send------
[2015-01-28 13:54:06,009] INFO No ready nodes, timeout = 9223372036854775807
(org.apache.kafka.clients.producer.internals.Sender)
[2015-01-28 13:54:06,009] INFO responses #: 1
(org.apache.kafka.clients.producer.internals.Sender)
------ 3rd poll for send, no receive------
[2015-01-28 13:54:06,010] INFO Nodes with data ready to send: [Node(0,
jqin-ld1.linkedin.biz, 9092)]
(org.apache.kafka.clients.producer.internals.Sender)
[2015-01-28 13:54:06,010] INFO Created 1 produce requests:
[ClientRequest(expectResponse=true,
payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0,
recordCount=15)},
request=RequestSend(header={api_key=0,api_version=0,correlation_id=1075,client_id=producer-performance},
body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
lim=15780 cap=16384]}]}]}))]
(org.apache.kafka.clients.producer.internals.Sender)
[2015-01-28 13:54:06,010] INFO responses #: 0
(org.apache.kafka.clients.producer.internals.Sender)
---- 4th poll for receive, no send----
[2015-01-28 13:54:06,010] INFO No ready nodes, timeout = 9223372036854775807
(org.apache.kafka.clients.producer.internals.Sender)
[2015-01-28 13:54:06,010] INFO responses #: 1
(org.apache.kafka.clients.producer.internals.Sender)
---- 5th poll for send, no receive----
[2015-01-28 13:54:06,011] INFO Nodes with data ready to send: [Node(0,
jqin-ld1.linkedin.biz, 9092)]
(org.apache.kafka.clients.producer.internals.Sender)
[2015-01-28 13:54:06,011] INFO Created 1 produce requests:
[ClientRequest(expectResponse=true,
payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0,
recordCount=15)},
request=RequestSend(header={api_key=0,api_version=0,correlation_id=1076,client_id=producer-performance},
body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
lim=15780 cap=16384]}]}]}))]
(org.apache.kafka.clients.producer.internals.Sender)
[2015-01-28 13:54:06,011] INFO responses #: 0
(org.apache.kafka.clients.producer.internals.Sender)
---- 6th poll for receive, no send-----
[2015-01-28 13:54:06,011] INFO No ready nodes, timeout = 9223372036854775807
(org.apache.kafka.clients.producer.internals.Sender)
.........
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)