[
https://issues.apache.org/jira/browse/KAFKA-1905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14298106#comment-14298106
]
Jay Kreps commented on KAFKA-1905:
----------------------------------
It's not every day you double performance by moving a few lines of code
around...
> KafkaProducer's performance could be halved when MaxInFlightRequest is set to
> 1
> -------------------------------------------------------------------------------
>
> Key: KAFKA-1905
> URL: https://issues.apache.org/jira/browse/KAFKA-1905
> Project: Kafka
> Issue Type: Bug
> Reporter: Jiangjie Qin
> Assignee: Jiangjie Qin
>
> In KafkaProducer, the following logic is used in each poll():
> 1. Get a list of nodes who has a batch available for sending
> 2. Filter the list to remove the node which is not ready to receive a new
> request (MaxInFlightRequests is checked here)
> 3. Compose the requests for the nodes in the filtered list, i.e. has a batch
> to send and also ready to receive.
> 4. Increase InFlightRequests, send the requests and get the responses of
> previous send.
> 5. handle all receives and decrease the inFlightRequests.
> In this case, when MaxInFlightRequest is set to 1, since we are checking the
> InFlightRequests before each receive, even if we have already received the
> response, the node will still be considered not ready. So for a sequence of
> poll, we end up in the PollForSend - PollForReceive - PollForSend... pattern.
> Which essentially halved the throughput in a fast network. Ideally we should
> check whether node is ready after we check all the receives.
> Here are the some logs that shows this situation when I run
> kafka-producer-perf-test locally.
> -----1st poll for send, no receive------
> [2015-01-28 13:54:06,009] INFO Nodes with data ready to send: [Node(0,
> jqin-ld1.linkedin.biz, 9092)]
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,009] INFO Created 1 produce requests:
> [ClientRequest(expectResponse=true,
> payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0,
> recordCount=15)},
> request=RequestSend(header={api_key=0,api_version=0,correlation_id=1074,client_id=producer-performance},
>
> body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
> lim=15780 cap=16384]}]}]}))]
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,009] INFO responses #: 0
> (org.apache.kafka.clients.producer.internals.Sender)
> ------ 2nd poll for receive, no send------
> [2015-01-28 13:54:06,009] INFO No ready nodes, timeout = 9223372036854775807
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,009] INFO responses #: 1
> (org.apache.kafka.clients.producer.internals.Sender)
> ------ 3rd poll for send, no receive------
> [2015-01-28 13:54:06,010] INFO Nodes with data ready to send: [Node(0,
> jqin-ld1.linkedin.biz, 9092)]
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,010] INFO Created 1 produce requests:
> [ClientRequest(expectResponse=true,
> payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0,
> recordCount=15)},
> request=RequestSend(header={api_key=0,api_version=0,correlation_id=1075,client_id=producer-performance},
>
> body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
> lim=15780 cap=16384]}]}]}))]
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,010] INFO responses #: 0
> (org.apache.kafka.clients.producer.internals.Sender)
> ---- 4th poll for receive, no send----
> [2015-01-28 13:54:06,010] INFO No ready nodes, timeout = 9223372036854775807
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,010] INFO responses #: 1
> (org.apache.kafka.clients.producer.internals.Sender)
> ---- 5th poll for send, no receive----
> [2015-01-28 13:54:06,011] INFO Nodes with data ready to send: [Node(0,
> jqin-ld1.linkedin.biz, 9092)]
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,011] INFO Created 1 produce requests:
> [ClientRequest(expectResponse=true,
> payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0,
> recordCount=15)},
> request=RequestSend(header={api_key=0,api_version=0,correlation_id=1076,client_id=producer-performance},
>
> body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
> lim=15780 cap=16384]}]}]}))]
> (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,011] INFO responses #: 0
> (org.apache.kafka.clients.producer.internals.Sender)
> ---- 6th poll for receive, no send-----
> [2015-01-28 13:54:06,011] INFO No ready nodes, timeout = 9223372036854775807
> (org.apache.kafka.clients.producer.internals.Sender)
> .........
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)