Hi Eric, You've given hardware information about your brokers, but I don't think you've provided information about the machine your producer is running on. Have you verified that you're not reaching any caps on your producer's machine?
I also think you might be hitting the limit of what a single producer is capable of pushing through with your current setup. With record size of ~12k and the default batch size configuration of 64k, you'll only be able to send 5 records per batch. The default number of in flight batches is 5. This means at any given time, you'll only have 25 records in flight per connection. I'm assuming your partitions are configured with at least 2 replicas. Acks=all means your producer is going to wait for the records to be fully replicated before considering it complete. Doing the math, you have ~200 records per second, but this is split between 2 brokers. This means you're producing 100 records per second per broker. Simplifying a bit to 25 records in flight per broker, that's a latency of ~250 ms to move around 300kb. At minimum, this includes the time to, [compress the batch], [send the batch over the network to the leader], [write the batch to the leader's log], [fetch the batch over the network to the replica], [write the batch to the replica's log], and all of the assorted responses to those calls. On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <eric.owh...@esgyn.com> wrote: > Hi Jamie, > Thanks for the hint. I played with these parameters, and found only > linger.ms is playing a significant role for my test case. > It is very sensitive and highly non linear. > I get these results: > Linger.ms message per second > 80 100 > 84 205 > 85 215 -> top > 86 213 > 90 205 > 95 195 > 100 187 > 200 100 > > So as you can see, this is very sensitive and one can miss the peek easily. > However, 200 messages per second for 2 powerful nodes and relatively small > message (12016bytes) is still at least 10X bellow what I would have hoped. > When I see system resources still being barely moving, with cpu at 3%, I am > sure something is not right. > Regards, > Eric > > -----Original Message----- > From: Jamie <jamied...@aol.co.uk.INVALID> > Sent: Wednesday, October 2, 2019 4:27 PM > To: users@kafka.apache.org > Subject: Re: poor producing performance with very low CPU utilization? > > External > > Hi Eric, > I found increasing the linger.ms to between 50-100 ms significantly > increases performance (fewer larger requests instead of many small ones), > I'd also increase the batch size and the buffer.memory. > Thanks, > Jamie > > > -----Original Message----- > From: Eric Owhadi <eric.owh...@esgyn.com> > To: users@kafka.apache.org <users@kafka.apache.org> > Sent: Wed, 2 Oct 2019 16:42 > Subject: poor producing performance with very low CPU utilization? > > Hi Kafka users, > I am new to Kafka and am struggling with getting acceptable producing rate. > I am using a cluster of 2 nodes, 40 CPU cores/ 80 if counting > hyperthreading. 256GB memory on a 10Gbit network Kafka is installed as part > of cloudera parcel, with 5GB java heap. > Producer version: Kafka client 2.2.1 > > Wed Oct 2 07:56:59 PDT 2019 > JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-0.b11.el6_9.x86_64 > Using -XX:+HeapDumpOnOutOfMemoryError > -XX:HeapDumpPath=/tmp/kafka_kafka-KAFKA_BROKER-c1871edf37153578a6fc7f41462d01d2_pid6908.hprof > -XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh as > CSD_JAVA_OPTS Using > /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER as conf dir > Using scripts/control.sh as process script > CONF_DIR=/var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER > CMF_CONF_DIR=/etc/cloudera-scm-agent > > Date: Wed Oct 2 07:56:59 PDT 2019 > Host: xxxxx.esgyn.local > Pwd: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER > CONF_DIR: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER > KAFKA_HOME: /opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka > Zookeeper Quorum: > xxx.esgyn.local:2181,xxx.esgyn.local:2181,xxx.esgyn.local:2181 > Zookeeper Chroot: > PORT: 9092 > JMX_PORT: 9393 > SSL_PORT: 9093 > ENABLE_MONITORING: true > METRIC_REPORTERS: nl.techop.kafka.KafkaHttpMetricsReporter > BROKER_HEAP_SIZE: 5120 > BROKER_JAVA_OPTS: -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 > -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC > -Djava.awt.headless=true > BROKER_SSL_ENABLED: false > KERBEROS_AUTH_ENABLED: false > KAFKA_PRINCIPAL: > SECURITY_INTER_BROKER_PROTOCOL: INFERRED > AUTHENTICATE_ZOOKEEPER_CONNECTION: true > SUPER_USERS: kafka > Kafka version found: 2.2.1-kafka4.1.0 > Sentry version found: 1.5.1-cdh5.15.0 > ZK_PRINCIPAL_NAME: zookeeper > Final Zookeeper Quorum is > xxx.esgyn.local:2181,xx.esgyn.local:2181,x.esgyn.local:2181 > security.inter.broker.protocol inferred as PLAINTEXT > LISTENERS=listeners=PLAINTEXT://xxxxx.esgyn.local:9092, > > I am producing messages of 12016 bytes uncompressed, then snappy > compressed by kafka. > I am using a topic with 200 partitions, and a custom partitioner that I > verified is doing good job at spreading the load on the 2 brokers. > > My producer config look like: > > kafkaProps.put("bootstrap.servers","nap052.esgyn.local:9092,localhost:9092"); > kafkaProps.put("key.serializer", > "org.apache.kafka.common.serialization.LongSerializer"); > > kafkaProps.put("value.serializer","org.trafodion.sql.kafka.SmartpumpCollectorVectorSerializer"); > > kafkaProps.put("partitioner.class","org.trafodion.sql.kafka.TimeSeriesPartitioner"); > kafkaProps.put("compression.type","snappy"); > kafkaProps.put("batch.size","65536"); > kafkaProps.put("acks", "all"); > kafkaProps.put("linger.ms","1"); > > I tried first doing fire and forget send, thinking I would get best > performance. > Then I tried synchronous send, and amazingly found that I would get better > performance with sync send. > > However, after 1 or 2 minute of load test, I start getting error on the > synchronous send like this: > ava.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed > since batch creation > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30) > at > org.trafodion.sql.kafka.TimeseriesEndPoint$customHandler.handle(TimeseriesEndPoint.java:315) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) > at org.eclipse.jetty.server.Server.handle(Server.java:505) > at > org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370) > at > org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267) > at org.eclipse.jetty.io > .AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305) > at org.eclipse.jetty.io > .FillInterest.fillable(FillInterest.java:103) > at org.eclipse.jetty.io > .ChannelEndPoint$2.run(ChannelEndPoint.java:117) > at > org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333) > at > org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310) > at > org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168) > at > org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126) > at > org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366) > at > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:781) > at > org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:917) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 > record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms > has passed since batch creation > > So I am suspecting that I am producing too fast, and brokers cannot catch > up. > I tried bumping up the io thread from default 8 to 40, that did not help. > > I am getting a producing rate of only about 100 message per seconds, and > about 1 Megabyte per seconds according to kafka metrics. > The CPU utilization is barely noticeable (3%), network is ridiculously > unaffected, and having googled around, this is not the kind of perf I > should expect out of my config. I was hoping for at least 10X more if not > 100X better. Was my expectations too high, or am I missing something in > config that is causing this performance numbers? > > Some details: I produce using a jetty custom handler that I verified to be > super-fast when I am not producing (commenting out the send()), and I am > using a single (I also tried with 2) producer reused on all jetty threads. > > Any help/clue would be much appreciated, Thanks in advance, Eric Owhadi > Esgyn Corporation. > > >