I’m moving this issue from Stack Overflow to the Apache Kafka Users List
http://stackoverflow.com/questions/34213272/kafka-producer-0-9-performance-issue-with-small-messages
<http://stackoverflow.com/questions/34213272/kafka-producer-0-9-performance-issue-with-small-messages>
There was some discussion on Stack Overflow, but I’m expecting the Apache List
will be better able to help in identifying the root cause since it appears not
necessarily to be a BlueMix Message Hub issue. Thanks!
-----
We are observing very poor performance with a Java Kafka Producer 0.9 client
when sending small messages. The messages are not being accumulated into a
larger request batch and thus each small record is being sent separately.
What is wrong with our client configuration? Or is this some other issue?
-----
Using Kafka Client 0.9.0.0. We did not see any related postings in the Kafka
unreleased 9.0.1 or 9.1 fixed or unresolved lists, so we are focused on our
client configuration and server instance.
We understand the linger.ms should cause the client to accumulate records into
a batch.
We set linger.ms to 10 (and also tried 100 and 1000) but these did not result
in the batch accumulating records. With a record size of about 100 bytes and a
request buffer size of 16K, We would have expected about 160 messages to be
sent in a single request.
The trace at the client seems to indicate that the partition may be full,
despite having allocated a fresh Bluemix Messaging Hub (Kafka Server 0.9)
service instance. The test client is sending multiple messages in a loop with
no other I/O.
-----
The log shows a repeating sequence with a suspect line: "**Waking up the sender
since topic mytopic partition 0 is either full or getting a new batch**".
So the newly allocated partition should be essentially empty in our test case,
thus why would the producer client be getting a new batch?
<pre>
2015-12-10 15:14:41,335 3677 [main] TRACE
com.isllc.client.producer.ExploreProducer - Sending record: Topic='mytopic',
Key='records', Value='Kafka 0.9 Java Client Record Test Message 00011
2015-12-10T15:14:41.335-05:00'
2015-12-10 15:14:41,336 3678 [main] TRACE
org.apache.kafka.clients.producer.KafkaProducer - Sending record
ProducerRecord(topic=mytopic, partition=null, key=[B@670b40af,
value=[B@4923ab24 with callback null to topic mytopic partition 0
2015-12-10 15:14:41,336 3678 [main] TRACE
org.apache.kafka.clients.producer.internals.RecordAccumulator - Allocating a
new 16384 byte message buffer for topic mytopic partition 0
2015-12-10 15:14:41,336 3678 [main] TRACE
org.apache.kafka.clients.producer.KafkaProducer - Waking up the sender since
topic mytopic partition 0 is either full or getting a new batch
2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExploreProducer]
TRACE org.apache.kafka.clients.producer.internals.Sender - Nodes with data
ready to send: [Node(0,
kafka01-prod01.messagehub.services.us-south.bluemix.net, 9094)]
2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExploreProducer]
TRACE org.apache.kafka.clients.producer.internals.Sender - Created 1 produce
requests: [ClientRequest(expectResponse=true,
callback=org.apache.kafka.clients.producer.internals.Sender$1@6d62e963,
request=RequestSend(header={api_key=0,api_version=1,correlation_id=11,client_id=ExploreProducer},
body={acks=-1,timeout=30000,topic_data=[{topic=mytopic,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
lim=110 cap=16384]}]}]}), createdTimeMs=1449778481348, sendTimeMs=0)]
2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExploreProducer]
TRACE org.apache.kafka.clients.producer.internals.Sender - Received produce
response from node 0 with correlation id 11
2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExploreProducer]
TRACE org.apache.kafka.clients.producer.internals.RecordBatch - Produced
messages to topic-partition mytopic-0 with base offset offset 130 and error:
null.
2015-12-10 15:14:41,412 3754 [main] TRACE
com.isllc.client.producer.ExploreProducer - Send returned metadata:
Topic='mytopic', Partition=0, Offset=130
2015-12-10 15:14:41,412 3754 [main] TRACE
com.isllc.client.producer.ExploreProducer - Sending record: Topic='mytopic',
Key='records', Value='Kafka 0.9 Java Client Record Test Message 00012
2015-12-10T15:14:41.412-05:00'
Log entries repeat like the above for each record sent
</pre>
------
We provided the following properties file:
<pre>
2015-12-10 15:14:37,843 185 [main] INFO com.isllc.client.AbstractClient -
Properties retrieved from file for Kafka client: kafka-producer.properties
2015-12-10 15:14:37,909 251 [main] INFO com.isllc.client.AbstractClient -
acks=-1
2015-12-10 15:14:37,909 251 [main] INFO com.isllc.client.AbstractClient -
ssl.protocol=TLSv1.2
2015-12-10 15:14:37,909 251 [main] INFO com.isllc.client.AbstractClient -
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient -
client.id=ExploreProducer
2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient -
ssl.truststore.identification.algorithm=HTTPS
2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient -
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient -
ssl.truststore.password=changeit
2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient -
ssl.truststore.type=JKS
2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient -
ssl.enabled.protocols=TLSv1.2
2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient -
ssl.truststore.location=/Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts
2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient -
bootstrap.servers=kafka01-prod01.messagehub.services.us-south.bluemix.net:9094,kafka02-prod01.messagehub.services.us-south.bluemix.net:9094,kafka03-prod01.messagehub.services.us-south.bluemix.net:9094,kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,kafka05-prod01.messagehub.services.us-south.bluemix.net:9094
2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient -
security.protocol=SASL_SSL
Plus we added linger.ms=10 in code.
</pre>
-----
The Kafka Client shows the expanded/merged configuration list (and displaying
the linger.ms setting):
<pre>
2015-12-10 15:14:37,970 312 [main] INFO
org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
compression.type = none
metric.reporters = []
metadata.max.age.ms = 300000
metadata.fetch.timeout.ms = 60000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers =
[kafka01-prod01.messagehub.services.us-south.bluemix.net:9094,
kafka02-prod01.messagehub.services.us-south.bluemix.net:9094,
kafka03-prod01.messagehub.services.us-south.bluemix.net:9094,
kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,
kafka05-prod01.messagehub.services.us-south.bluemix.net:9094]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
max.block.ms = 60000
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = [hidden]
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
client.id = ExploreProducer
ssl.endpoint.identification.algorithm = null
ssl.protocol = TLSv1.2
request.timeout.ms = 30000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2]
acks = -1
batch.size = 16384
ssl.keystore.location = null
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = SASL_SSL
retries = 0
max.request.size = 1048576
value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
ssl.truststore.location =
/Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
send.buffer.bytes = 131072
linger.ms = 10
</pre>
-----
The Kafka metrics after sending 100 records:
<pre>
Duration for 100 sends 8787 ms. Sent 7687 bytes.
batch-size-avg = 109.87 [The average number of bytes sent per partition
per-request.]
batch-size-max = 110.0 [The max number of bytes sent per partition
per-request.]
buffer-available-bytes = 3.3554432E7 [The total amount of buffer memory
that is not being used (either unallocated or in the free list).]
buffer-exhausted-rate = 0.0 [The average per-second number of record sends
that are dropped due to buffer exhaustion]
buffer-total-bytes = 3.3554432E7 [The maximum amount of buffer memory the
client can use (whether or not it is currently used).]
bufferpool-wait-ratio = 0.0 [The fraction of time an appender waits for
space allocation.]
byte-rate = 291.8348916277093 []
compression-rate = 0.0 []
compression-rate-avg = 0.0 [The average compression rate of record
batches.]
connection-close-rate = 0.0 [Connections closed per second in the window.]
connection-count = 2.0 [The current number of active connections.]
connection-creation-rate = 0.05180541884681138 [New connections established
per second in the window.]
incoming-byte-rate = 10.342564641029007 []
io-ratio = 0.0038877559207471236 [The fraction of time the I/O thread spent
doing I/O]
io-time-ns-avg = 353749.2840375587 [The average length of time for I/O per
select call in nanoseconds.]
io-wait-ratio = 0.21531227995769162 [The fraction of time the I/O thread
spent waiting.]
io-wait-time-ns-avg = 1.9591901192488264E7 [The average length of time the
I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.]
metadata-age = 8.096 [The age in seconds of the current producer metadata
being used.]
network-io-rate = 5.2937784999213795 [The average number of network
operations (reads or writes) on all connections per second.]
outgoing-byte-rate = 451.2298783403283 []
produce-throttle-time-avg = 0.0 [The average throttle time in ms]
produce-throttle-time-max = 0.0 [The maximum throttle time in ms]
record-error-rate = 0.0 [The average per-second number of record sends that
resulted in errors]
record-queue-time-avg = 15.5 [The average time in ms record batches spent
in the record accumulator.]
record-queue-time-max = 434.0 [The maximum time in ms record batches spent
in the record accumulator.]
record-retry-rate = 0.0 []
record-send-rate = 2.65611304417116 [The average number of records sent per
second.]
record-size-avg = 97.87 [The average record size]
record-size-max = 98.0 [The maximum record size]
records-per-request-avg = 1.0 [The average number of records per request.]
request-latency-avg = 0.0 [The average request latency in ms]
request-latency-max = 74.0 []
request-rate = 2.6468892499606897 [The average number of requests sent per
second.]
request-size-avg = 42.0 [The average size of all requests in the window..]
request-size-max = 170.0 [The maximum size of any request sent in the
window.]
requests-in-flight = 0.0 [The current number of in-flight requests awaiting
a response.]
response-rate = 2.651196976060479 [The average number of responses received
per second.]
select-rate = 10.989861465830819 [Number of times the I/O layer checked for
new I/O to perform per second]
waiting-threads = 0.0 [The number of user threads blocked waiting for
buffer memory to enqueue their records]
</pre>
Thanks
Gary Gershon
Principal, Intermedia Sciences LLC
(908) 969-1119
[email protected]