Hi Mark,

it really doesn’t matter what I configure for “Max Poll Records” (tried 10, 
1’000, 100’000) or for “Max Uncommitted Time” (tried 1s, 10s, 100s). The 
flowfile size is always randomly between 1 and about 500 records. So those two 
parameters doesn’t work at all in my case. The theory behind it is clear, but 
it doesn’t work as expected… Of course the queue was more than full.

In the meantime I’ve done some tests with the KafkaConsume processor, and the 
performance difference is again huge - 4 times better performance (1 Million 
messages per sec) with the same topic and amount of threads for the Non-Record 
processor -> network limit reached. Seems that the RecordReader/RecordWriter 
part of the KafkaConsumeRecord processor consumes a lot of CPU power. 
Interesting that nobody complained about it until now, are we that high with a 
few 100’000 messages per second? We have sources which produces about 200’000 
messages/s and we would like to consume that as well a few times faster than 
producing.

We have now plans to implement a KafkaAvroConsumer, which is based on the 
KafkaConsumer processor. It will consume from kafka and write avro out instead 
of the plain message with demarcator. We hope to get the same great performance 
as with the KafkaConsume processor.

Cheers Josef

From: Mark Payne <marka...@hotmail.com>
Reply to: "users@nifi.apache.org" <users@nifi.apache.org>
Date: Monday, 22 June 2020 at 15:03
To: "users@nifi.apache.org" <users@nifi.apache.org>
Subject: Re: ConsumeKafkaRecord Performance Issue

Josef,

The Max Poll Records just gets handed to the Kafka client and limits the size 
of how much should be pulled back in a single request. This can be important 
because if you set the value really high, you could potentially buffer up a lot 
of messages in memory and consume a lot of heap. But setting it too low would 
result in small batches. So that property can play a roll in the size of a 
batch of records, but you should not expect to see batches output that are 
necessarily equal to the value there.

What value do you have set for the “Max Uncommitted Time”? That can certainly 
play a roll in the size of the FlowFiles that are output.

Thanks
-Mark



On Jun 22, 2020, at 2:48 AM, 
josef.zahn...@swisscom.com<mailto:josef.zahn...@swisscom.com> wrote:

Hi Mark,

thanks a lot for your explanation, makes fully sense! Did you checked as well 
the “Max Poll Records” parameter? Because no matter how high I’m setting it I’m 
getting always a random number of records back into one flowfile. The max. is 
about 400 records which isn’t ideal for small records as NiFi gets a lot of 
Flowfiles with a few kilobytes in case of a huge backlog.

Cheers Josef


From: Mark Payne <marka...@hotmail.com<mailto:marka...@hotmail.com>>
Reply to: "users@nifi.apache.org<mailto:users@nifi.apache.org>" 
<users@nifi.apache.org<mailto:users@nifi.apache.org>>
Date: Friday, 19 June 2020 at 17:06
To: "users@nifi.apache.org<mailto:users@nifi.apache.org>" 
<users@nifi.apache.org<mailto:users@nifi.apache.org>>
Subject: Re: ConsumeKafkaRecord Performance Issue

Josef,

Glad you were able to get past this hurdle. The reason for the consumer 
yielding is a bit complex. NiFi issues an async request to Kafka to retrieve 
messages. Then, NiFi performs a long-poll to get those messages from the Kafka 
client. If the client returns 0 messages from the long-poll, the assumption 
that NiFi makes is that there are no more messages available from Kafka. So it 
yields to avoid hammering the Kafka server constantly when there are no 
messages available. Unfortunately, though, I have found fairly recently by 
digging into the Kafka client code that returning 0 messages happens not only 
when there are no messages available on the Kafka server but also if the client 
just takes longer than that long-poll (10 milliseconds) to receive the response 
and prepare the messages on the client side. The client doesn’t appear to 
readily expose any information about whether or not there are more messages 
available, so this seems to be the best we can do with what the client 
currently provides.

So setting a yield duration of 0 seconds will provide much higher throughput 
but may put more load on the Kafka brokers.



On Jun 19, 2020, at 10:12 AM, 
josef.zahn...@swisscom.com<mailto:josef.zahn...@swisscom.com> wrote:

Hi Mark, Pierre

We are using NiFi 1.11.4, so fully up to date.

Are you kidding me :-D,  “Yield Duration” was always on the default value (1 
secs), as I didn’t expect that the processor “yields”. But due to your comment 
I’ve changed it to “0 secs”. I can’t believe it, the performance has been 
increased to the same value (about 250’000k messages per seconds) as the  
kafka-consumer-perf-test.sh shows. Thanks a lot!! However 250k messages is 
still not enough to cover all our use cases, but at least it is now consistent 
to the kafka performance testing script. The Kafka Grafana shows about 60MB/s 
outgoing with the current number of messages.

@Pierre: The setup you are referring to with 10-20Mio messages per seconds. How 
many partitions had they and how big were the messages? We are storing the 
messages in this example as AVRO with about 44 fields.

Cheers Josef


PS: below some more information about my setup (even though our main issue has 
been solved):

As record reader I’m using a AvroReader which gets the schema from a confluent 
schema registry. Every setting there is default, except the connection 
parameters to confluent. As record writer I’m using AvroRecordSetWriter with a 
predefined schema as we only want to have a reduced column set.

The 8 servers are using only SAS SSDs and doesn’t store the data. The data goes 
from ConsumeKafkaRecord directly into our DB which runs on another cluster. As 
I mentioned already, problem was there whether I’ was using “Primary Only” or 
distribute it on the cluster. So it wasn’t a limit on a single node.

<image001.png>



From: Mark Payne <marka...@hotmail.com<mailto:marka...@hotmail.com>>
Reply to: "users@nifi.apache.org<mailto:users@nifi.apache.org>" 
<users@nifi.apache.org<mailto:users@nifi.apache.org>>
Date: Friday, 19 June 2020 at 14:46
To: "users@nifi.apache.org<mailto:users@nifi.apache.org>" 
<users@nifi.apache.org<mailto:users@nifi.apache.org>>
Subject: Re: ConsumeKafkaRecord Performance Issue

Josef,

Have you tried updating the processor’s Yield Duration (configure -> settings 
tab)? Setting that to “0 secs” can make a big difference in 
ConsumeKafka(Record)’s performance.

Also, what kind of data rate (MB/sec) are you looking at, which record reader 
and writer are you using? Are you using a schema registry? Spinning disk or ssd?

All of these can make a big difference in performance.

Thanks
Mark


On Jun 19, 2020, at 3:45 AM, 
"josef.zahn...@swisscom.com<mailto:josef.zahn...@swisscom.com>" 
<josef.zahn...@swisscom.com<mailto:josef.zahn...@swisscom.com>> wrote:
Hi Chris

Our brokers are using Kafka 2.3.0, just slightly different to my 
kafka-consumer-perf-test.sh.

I’ve now tested as well with the performance shell script from kafka 2.0.0, it 
showed the same result as with 2.3.1.

in my eyes at least 100k/s messages should be possible easily, especially with 
the number of threads of NiFi… As we have sources which generates about 300k to 
400k/s messages NiFi is at the moment far to slow to even consume real time, 
and it gets even worse if we are behind the offset we can’t catch up anymore.

At the moment we can’t use NiFi to consume from Kafka.

Cheers Josef


From: "christophe.mon...@post.ch<mailto:christophe.mon...@post.ch>" 
<christophe.mon...@post.ch<mailto:christophe.mon...@post.ch>>
Reply to: "users@nifi.apache.org<mailto:users@nifi.apache.org>" 
<users@nifi.apache.org<mailto:users@nifi.apache.org>>
Date: Friday, 19 June 2020 at 08:54
To: "users@nifi.apache.org<mailto:users@nifi.apache.org>" 
<users@nifi.apache.org<mailto:users@nifi.apache.org>>
Subject: RE: ConsumeKafkaRecord Performance Issue

Hi Josef

I noticed that you run kafka-consumer-perf-test.sh of Kafka 2.3.1 but NiFi is 
bundled with kafka-clients-2.0.0.jar
Maybe you could try the performance test with the same client version?

What is the version of your kafka brokers?

Regards
Chris

From: josef.zahn...@swisscom.com<mailto:josef.zahn...@swisscom.com> 
<josef.zahn...@swisscom.com<mailto:josef.zahn...@swisscom.com>>
Sent: Friday, 19. June 2020 07:55
To: users@nifi.apache.org<mailto:users@nifi.apache.org>
Subject: ConsumeKafkaRecord Performance Issue

Hi guys,

We have faced a strange behavior of the ConsumeKafkaRecord processor (and it’s 
pendant ConsumeKafka). We have a kafka Topic with 15 partitions and a producer 
which inserts via NiFi in peak about 40k records per second to the topic. The 
thing is now, it doesn’t matter whether we are using the 8-Node Cluster or 
configuring execution on “Primary Node”, the performance is terrible. We made a 
test with execution on “Primary Node” and started with one thread, the result 
can you see below. As soon as we reached 3 threads the performance went down 
and never went higher than that, doesn’t matter how many threads or cluster 
nodes. We tried 2 threads in the 8 node cluster (16 threads in total) and even 
more. Didn’t help, we stuck at this 12’000’000 – 14’000’000 records per 5 min 
(so round about 45k records per second). Btw. for the tests we were always 
behind the offset, so there were a lot of messages in the kafka queue.

<image001.png>


We also tested with the performance script which comes with kafka. It showed 
250k messages/s without any tuning at all (however without any decoding of the 
messages of course). So in theory kafka and the network in between couldn’t be 
the culprit. It must be something within NiFi.

[user@nifi ~]$ /opt/kafka_2.12-2.3.1/bin/kafka-consumer-perf-test.sh 
--broker-list kafka.xyz.net:9093<http://kafka.sbd.corproot.net:9093/> --group 
nifi --topic events --consumer.config 
/opt/sbd_kafka/credentials_prod/client-ssl.properties --messages 3000000

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, 
nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2020-06-15 17:20:05:273, 2020-06-15 17:20:20:429, 515.7424, 34.0289, 3000000, 
197941.4093, 3112, 12044, 42.8215, 249086.6822


We have also seen that “Max Poll Records” in our case never gets reached, we 
had in max. about 400 records in one flowfile even though we configured 100’000 
- which could be a part of the problem.

<image002.png>

Seems that I’m not alone with my issue, even though his performance was even 
worse than ours:
https://stackoverflow.com/questions/62104646/nifi-poor-performance-of-consumekafkarecord-2-0-and-consumekafka-2-0

Any help would be really appreciated.

If nobody has an idea I have to open a bug ticket :-(.

Cheers, Josef

Attachment: smime.p7s
Description: S/MIME Cryptographic Signature

Reply via email to