Nathan,

Hopefully, this finds your issues resolved. But if not, I do have another
direction to investigate. In my experience, it's not enough to just move
your content repo to SSD. Generally, I only see a gain if both the
provenance repo and the content repo are on the same speed disk. But I
might be biased as my flows take in high volumes and have a high number of
provenance inserts to process. You can test this (if your comfortable not
recording your provenance for a while), by switching the
implementation method of your provenance repository to volatile (see here
for more details -
https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#provenance-repository).
It basically just keeps a set number of records in memory (see here on how
to configure -
https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#volatile-provenance-repository-properties),
which removes any potential disk bottlenecks for the prov repo. I wouldn't
want to run a production cluster like this and keep in mind it will impact
your heap space (especially if you make the number of buffered records
high), but it would be a starting point to determine if you're bottleneck
is related to disk IO for the provenance repository storage.

v/r,

Eric O'Reilly

On Fri, Sep 25, 2020 at 7:10 AM <nathan.engl...@bt.com> wrote:

> Hi Matt,
>
>
>
> We’ve now switched to the JoltTransformRecord processors. It does seem to
> be slightly better performing.
>
>
>
> We are trying to switch out the ConsumeKafka_0_10 and ConvertRecord
> processors to the ConsumeKafkaRecord_0_10 processor based on feedback in
> this chain as well.
>
>
>
> With the ConvertRecord processor, we used the kafka.topic attribute as it
> was available because the ConsumeKafka processor had completed on the flow
> file. The ConsumeKafkaRecord handles it all in one. When testing, it failed
> to parse as kafka.topic is not set.
>
>
>
> As we are consuming from multiple topics each with a unique Avro schema
> the only way I can see to do this would be to have a separate processor and
> AvroReader controller service for each topic?
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
>
>
> *From:* Matt Burgess [mailto:mattyb...@apache.org]
> *Sent:* 24 September 2020 17:08
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Nathan,
>
>
>
> If you have multiple JSON messages in one flow file, is it in one large
> array, or a top-level JSON object with an array inside? Also are you trying
> to transform each message or the whole thing (i.e. do you need to know
> about more than one message at a time)? If you have a top-level array and
> are transforming each element in the array, you might get better
> performance out of JoltTransformRecord rather than JoltTransformJSON, as
> the latter reads the entire file into memory as a single JSON entity. If
> you have a top-level object then they will both read the whole thing in.
>
>
>
> Regards,
>
> Matt
>
>
>
>
>
> On Thu, Sep 24, 2020 at 10:25 AM <nathan.engl...@bt.com> wrote:
>
> Hi Mark,
>
>
>
> From what I can see (based on queues building before the processor and
> basically to empty after) it is the Jolt processor we have problems with.
> We’ve have tried to add more concurrency, reduce the run schedule and
> increasing the duration, but it didn’t seem to resolve the high CPU load
> (~32 when processing at the rates described in my first email, when no
> traffic is processing it sits at 0.2).
>
>
>
> It could be the completely wrong way of diagnosing this! I’ve struggled to
> find information (Apart from your great videos) to assist in getting to the
> bottom of it.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Mark Payne [mailto:marka...@hotmail.com]
> *Sent:* 24 September 2020 15:12
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Hey Nathan,
>
>
>
> A quick clarification - is the bottleneck / the slow point in your flow
> actually consuming from Kafka or Jolt? From your original message it sounds
> like the bottleneck may actually be the Jolt Transform processor?
>
>
>
> If the problem is in the ConsumeKafka processor, one thing you’ll want to
> look at is in the Settings tab, set the Yield Duration to “0 sec”. That can
> make a huge difference in performance from Kafka processors.
>
>
>
> Thanks
>
> -Mark
>
>
>
>
>
> On Sep 24, 2020, at 10:07 AM, nathan.engl...@bt.com wrote:
>
>
>
> Hi Bryan,
>
>
>
> Thanks for this. My understanding of the concurrent tasks was incorrect. I
> thought it was across the whole cluster, not per node.
>
>
>
> I did spend some time looking at the code for the demarcator as we had
> issues getting it batching. I think there may be a slight misunderstanding
> between my description and how it sounds.
>
>
>
> When I say an Empty string, the message demarcator isn’t blank. I have
> used the checkbox ‘Set Empty String’, which means the processor treats the
> field as Null (From memory). If I left the field empty (checkbox not
> selected), it was one Kafka message to one flow file, which was a massive
> bottleneck.
>
>
>
> I also seem to remember from when I looked at the code. The
> ConsumeKafkaRecord processors defaults the demarcator to null.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Bryan Bende [mailto:bbe...@gmail.com <bbe...@gmail.com>]
> *Sent:* 24 September 2020 14:54
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Regarding the batching, I would have to double check the code, but since
> you said the demarcator is empty string, I think that means it is not
> batching and putting one message to one flow file. Basically if a
> demarcator is not set then batch size is ignored.
>
>
>
> Regarding the processors/tasks, lets take one topic with 11 partitions as
> an example, if you make a consumer processor for this topic with 1
> concurrent task, then you have 3 instances of this processor since you have
> a 3 node cluster, so you might end up with something like this...
>
>
>
> node 1 - ConsumeKafka
>
>   Task 1 - 4 partitions
>
>
>
> node 2 - ConsumeKafka
>
>   Task 1 - 4 partitions
>
>
>
> node 3 - ConsumeKafka
>
>   Task 1 - 3 partitions
>
>
>
> It may not be exactly like that, but just an example as to how it should
> be assigned.
>
>
>
> To add more parallelism you could then increase concurrent tasks up to
> maybe 4 and you get something like this...
>
>
>
> node 1 - ConsumeKafka
>
>   Task 1 - 1 partition
>
>   Task 2 - 1 partition
>
>   Task 3 - 1 partition
>
>   Task 4 - 1 partition
>
>
>
> node 2 - ConsumeKafka
>
>   Task 1 - 1 partition
>
>   Task 2 - 1 partition
>
>   Task 3 - 1 partition
>
>   Task 4 - 1 partition
>
>
>
> node 3 - ConsumeKafka
>
>   Task 1 - 1 partition
>
>   Task 2 - 1 partition
>
>   Task 3 - 1 partition
>
>   Task 4 - nothing
>
>
>
> If you go higher than 4 concurrent tasks you will just end up creating
> more consumers than partitions, and there is nothing to assign them.
>
>
>
>
>
> On Thu, Sep 24, 2020 at 9:30 AM <nathan.engl...@bt.com> wrote:
>
> Hi Bryan,
>
>
>
> We have configured the processor to read in a maximum batch size of 2k
> messages, which does seem to have one than more Kafka message in the flow
> file.
>
>
>
> Completely understand on the Load balancing, we tried several iterations
> of 1 task to one topic partition. However, we still found it to be loaded
> towards one specific node. I will try splitting it into multiple processors
> to see if this handles it any better. We have 10 topics with 11 partitions.
> (one topic with 2 partitions). So I should set all concurrent tasks to 1
> with multiple processors (One processor per topic)?
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
> *From:* Bryan Bende [mailto:bbe...@gmail.com]
> *Sent:* 24 September 2020 13:59
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> I'm not sure how many topics you have, but the biggest improvement would
> probably be to not do a single message per flow file. You want to batch
> together lots of messages coming out of ConsumeKafka using a demarcator,
> and then convert/transform them in batches. You may need to have a separate
> consumer processor for each topic in order to do this correctly.
>
>
>
> You generally shouldn't need to use a load balanced connection after
> ConsumeKafka. If one node is being favored it is likely that the number of
> partitions in your topic is not lining up with the # of nifi nodes X # of
> concurrent consumer tasks. In the simplest case, if your topic had one
> partition, and you have 3 nifi nodes with Consumekafka with 1 concurrent
> task each, then all your messages will only get consumed on one of the
> nodes. If you have 3 partitions then it should be roughly equal. In your
> case you have 3 nodes with concurrent tasks set to 12, so you have
> potentially 36 consumers, which means if you have anything less than 36
> partitions then it is not going to be balanced equally.
>
>
>
> On Thu, Sep 24, 2020 at 3:54 AM <nathan.engl...@bt.com> wrote:
>
> Hi Joe,
>
>
>
> The Raids seem to give us a good IOPS number when we’ve tried testing
> them. We have seen a 300ms wait time on the Content Repo, hence why we have
> tried SSDs for the content repo as we assumed that was the bottleneck. The
> other Raids seemed OK to us.
>
>
>
> I will certainly look into the ConsumeKafkaRecord processor today and will
> come up with a solution to use it. I will feedback on what I find.
>
>
>
> In regards to our current flow configuration, we have the following
> high-level process groups:
>
> ·         ConsumeData -  Consumes the data from Kafka, Add some
> additional attributes, Converts to JSON from Avro
>
> ·         TransfromData – This is where the majority of the work happens,
> it gets routed on an attribute to a set sub-process group based on the type
> of data it is (this is decided based on the kafka.topic attribute) where
> the processing happens (Explained in more detail below)
>
> ·         ProduceData – Publishes the Record back to Kafka using the
> PublishKafkaRecord processor, add some attributes to calculate processing
> duration it took to process and logs this out.
>
> ·         Error Handling – A General Catch all. Luckily we don’t see this
> triggered very often, but it logs a message and writes the file to disk.
>
> <image001.png>
>
>
>
> *ConsumeData* process group:
>
> ·         ConsumeKafka, as mentioned in earlier emails, consumes the
> messages from multiple Kafka Topics with a Message Demarcator of an empty
> string and the Max poll Records set at 2,000.
>
> ·         The Consumed Flows queue uses RoundRobin load balancing as we
> have found one node is favoured for the consumption from Kafka so wanted to
> distribute the load.
>
> ·         AddAttributes uses the UpdateAttribute to add additional
> information to the flow file (Data Type, Output Kafka Topic) based on the
> Kafka.topic attribute on the flow file.
>
> ·         Finally, Convert Record converts the content of the flow file
> from Avro to JSON using the AvroRead and JsonRecordSetWriter Controller
> Services. In Additional we use the inbuilt AvroSchemaRegistry.
>
> <image002.png>
>
>
>
> *TransformData* process group. Due to the size, it’s not easy to show in
> a screenshot.
>
> ·         Flow files are routed based on attribute to 1 of 10 processing
> groups (based on the kafka.topic attribute)
>
> ·         Each Data Type has its own processing group which is I have
> gone into detail on one below
>
> ·         There are two failure output ports one for the initial routing
> issues, another for any errors that have happened in the sub-processing
> groups. There is also the Succesful output port.
>
>
>
> <image003.png>
>
>
>
>
>
> *HTTP Data Type Sub-processing group (Inside TransformData):*
>
> ·         The sub-processing groups all follow a somewhat similar layout
> but have different custom processors after the initial Jolt Transformation
>
> ·         The Jolt Transformation does the majority of the heavy lifting
> in converting the message into the right format to match the output Avro
> schema requirements. The Jolt Spec contains a chain of operations including
> modify-overwrite-beta, shift, and default
>
> ·         Each processor after the Jolt transformation does something
> different to meet a specific requirement not possible with other Nifi
> processors. Some like the IPAddressProcessor are reused in processing for
> other data types. We are planning to move into a single post-processing
> group where all data types are routed, before being published to Kafka.
>
> <image004.png>
>
>
>
> *Produce Data*, produces the messages back to different Kafka topics
> which have been defined in the output.topic (set in the consume data stage)
>
> ·         PublishKafkaRecord converts the message from JSON back to Avro
> (Different schema to input) using the JSONTreeReader and
> AvroRecordSetWriter Controller services. We have set it to Guarantee Single
> Node Delivery, and use Snappy Compression.
>
> ·         Set Exit Attributes adds a Final Processing time to the flow
> file
>
> ·         The Calculate duration uses this to work out its overall time
> processing
>
> ·         Then it’s finally logged to file so we can analyse the full
> processing time.
>
> <image005.png>
>
>
>
> Below is the scheduling settings we are using for processors:
>
> *Group*
>
> *Processor*
>
> *Concurrent*
>
> *Tasks*
>
> *Run*
>
> *Duration*
>
> *Yield*
>
> *Duration*
>
> *Other*
>
> Consume Data
>
> ConsumeKafka
>
> 12
>
> 0
>
> 1 mS
>
> Message Demarcator = "Empty string set"
>
> Max Poll Records = 2,000
>
> AddAttributes
>
> 10
>
> 0
>
> 1 S
>
> ConvertRecord
>
> 10
>
> 0
>
> 1 S
>
> TransformData
>
> RouteOnAttribute
>
> 10
>
> 0
>
> 1 S
>
> HTTP Jolt
>
> 5
>
> 0
>
> 200 mS
>
> HTTP Post Processor
>
> 2
>
> 0
>
> 1 S
>
> Produce Data
>
> PublishKafka
>
> 10
>
> 0
>
> 1 S
>
> Max Request Size = 1 MB
>
> Compression Type = snappy
>
> batch.size = 80,000 (Based on some custom producers we have written in the
> past)
>
> linger.ms
> <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Flinger.ms%2F&data=02%7C01%7Cnathan.english%40bt.com%7Ce1a257a5405a4210d35408d860a41ae1%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365605231282190&sdata=DwONev1IuaRISFcSVpRlIzWMLPumsFwBXWZe7V%2BCHVM%3D&reserved=0>
>  =
> 50,000 (Based on some custom producers we have written in the past)
>
> Set Exit Attributes
>
> 10
>
> 0
>
> 1 S
>
> Calculate Duration
>
> 10
>
> 0
>
> 1 S
>
> Log Success
>
> 10
>
> 0
>
> 1 S
>
>
>
> The Queue configs are as follows:
>
>
>
> *Queue*
>
>
> *Back Pressure Object Threshold*
>
>
> *Back Pressure Size Threshold*
>
> *Load Balance*
>
> After ConsumeKafka
>
> 20,000
>
> 1 GB
>
> Round Robin
>
> After Route On Attribute
>
> All Other Queues In TransformData
>
> 10,000
>
> 1 GB
>
> No
>
> All Failure Routes
>
> 10,000
>
> 1 GB
>
> No
>
> Between ConsumerData And TransformData
>
> Between TransformData And ProduceData
>
> All Other Queues In ConsumeData
>
> Before Route On Attribute
>
> All Queues In ProduceData
>
> 20,000
>
> 1 GB
>
> No
>
>
>
> We also have a Maximum Timer Driven Thread Count of 200 set.
>
>
>
> Stateless NiFi would fit this perfectly, but from what I have read, it’s
> not available in v1.9.2? We are stuck on 1.9.2 as we are using the
> Zookeeper provided as part of a Cloudera cluster. However, I’m wondering if
> we would be better suited to the volatile repositories, than writing to
> disk?
>
>
>
> Thanks again for the advice so far Joe, you’ve given us some confidence
> it’s us doing something wrong and not something with NiFi.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Joe Witt [mailto:joe.w...@gmail.com]
> *Sent:* 23 September 2020 19:05
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Nathan
>
>
>
> Not sure what read/write rates you'll get in these RAID-10 configs but
> generally this seems like it should be fine (100s of MB/sec per node range
> at least).  Whereas now you're seeing about 20MB/sec/node.  This is
> definitely very low.
>
>
>
> If you review
> http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.12.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/index.html
> <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fcomponents%2Forg.apache.nifi%2Fnifi-kafka-2-6-nar%2F1.12.0%2Forg.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6%2Findex.html&data=02%7C01%7Cnathan.english%40bt.com%7Ce1a257a5405a4210d35408d860a41ae1%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365605231282190&sdata=M3rQp0EYnUrRvfElTawfttqKloYH3P6sHEZgVc4Fpgw%3D&reserved=0>
>  then you'll see that we do actually capture attributes such as
> kafka.topic and so on.  Flowfiles would also be properly grouped by that.
> What I'm not positive of is it could handle reading from multiple topics at
> the same time while also honoring and determining each of their distinct
> schemas.  Would need to test/verify that scenario to be sure.  If you do
> have a bunch of topics and they could grow/change then keeping this single
> processor approach makes sense.  If you can go the route of one
> ConsumeKafkaRecord per topic then obviously that would work well.
>
>
>
> Not seeing your flow though I cannot be certain where the bottleneck(s)
> exist and provide guidance.  But this is without a doubt a vital skill to
> achieving maximum performance.
>
>
>
> You'd have to show/share a ton more details for folks here to be helpful
> in walking through the full design.  Or explain the end to end flow.
>
>
>
> As an additional food for thought if the flows are indeed 'from kafka ->
> do stuff -> back to kafka' this is likely a great use case for
> stateless-nifi.
>
>
>
> Thanks
>
>
>
> On Wed, Sep 23, 2020 at 10:43 AM <nathan.engl...@bt.com> wrote:
>
> Hi Joe,
>
>
>
> Thanks for getting back to me so quickly.
>
>
>
> Our disk setup is as follows:
>
> Path
>
> Storage Type
>
> Format
>
> Capacity
>
> Content
>
> /
>
> 100GB OS SSD
>
> ext4
>
> 89.9GB
>
> OS, NiFi install, Logs
>
> /data/1/
>
> 2 x 4TB SAS Hard Drives in RAID 1
>
> ext4
>
> 3.7TB
>
> Database and Flowfile Repos
>
> /data/2/
>
> 8 x 4TB SAS Hard Drives in RAID 10
>
> ext4
>
> 14.6TB
>
> Content Repo
>
> /data/3/
>
> 2 x 4TB SAS Hard Drives in RAID 1
>
> ext4
>
> 3.7TB
>
> Provence Repo
>
> /ssd
>
> 1 x 4TB PCIe NVMe SSD
>
> ext4
>
> 3.7TB
>
> Content Repo (Used instead of /data/2/ as a test), to see if CPU was
> bottlenecked by Disk operations.
>
>
>
> I will certainly take a look at those. One question with the consume
> record processor is how I would consume from multiple topics and ensure the
> correct Avro schema is used to deserialise the message? We have 1:1 mapping
> of schemas to topics. At the moment the ConsumeKafka processor is reading
> from all topics in one consumer. I’m assuming the attribute kafka.topic
> attribute doesn’t exist at this stage? We use the Avro Schema Registry
> Controller as we don’t have a schema registry in place yet.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Joe Witt [mailto:joe.w...@gmail.com]
> *Sent:* 23 September 2020 17:33
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Nathan
>
>
>
> You have plenty powerful machines to hit super high speeds but what I
> cannot tell is how the disks are setup/capability and layout wise and
> relative to our three repos of importance.  You'll need to share those
> details.
>
>
>
> That said, the design of the flow matters.  The Kafka processors that
> aren't record oriented will perform poorly unless they're acquiring data in
> their natural batches as they arrive from kafka.  In short, use the record
> oriented processors from Kafka.  In it you can even deal with the fact you
> want to go from AVRO to Json and so on.  These processors have a tougher
> learning curve but they perform extremely well and we have powerful
> processors to go along with them for common patterns.
>
>
>
> You absolutely should be able to get to the big numbers you have seen.  It
> requires great flow design (powerful machines are secondary).
>
>
>
> Thanks
>
>
>
> On Wed, Sep 23, 2020 at 9:26 AM <nathan.engl...@bt.com> wrote:
>
> Hi All,
>
>
>
> We’ve got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java
> Heap) servers. However, we have only been able to achieve a consumption of
> ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with
> a production rate of ~16.84GB out of the cluster over  5 mins. This is much
> lower than we were expecting based on what we have read. With this
> throughput we see a CPU load ~32 on all nodes, so we know there isn’t much
> else we can get out of the CPU).
>
>
>
> We have also tried SSDs, Raided and Unraided HDDs for the content repo
> storage, but they haven’t made a difference to the amount we can process.
>
>
>
> The process is as follows:
>
> 1.       Our flow reads from Kafka Compressed (Maximum of 2000 records
> per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 à
> UpdateAttribute à ConvertRecord)
>
> 2.       Depending on which topic the flow file is consumed from, we then
> send the message to one of 10 potential process groups, each containing
> between 3 and 5 processors within the process groups. (RouteOnAttribute à 
> Relevant
> Processing Group containing JoltTransformJSON and several custom processors
> we have made).
>
> 3.       Finally, we produce the flow file content back to one of several
> Kafka topics, based on the input topic name in Avro format with Snappy
> compression on the Kafka topic.
>
>
>
> Inspecting the queued message counts, it indicates that the Jolt
> Transforms are taking the time to process (Large queues before JOLT
> processors, small or no queues afterwards). But I’m not sure why this is
> any worse than the rest of the processors as the event duration is less
> than a second when inspecting in provenance? We have tuned the number of
> concurrent tasks, duration and schedules to get the performance we have so
> far.
>
>
>
> I’m not sure if there is anything anyone could recommend or suggest to try
> and make improvements? We need to achieve a rate around 5x of what it’s
> currently processing with the same number of nodes. We are running out of
> ideas on how to accomplish this and may have to consider alternatives.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
>

Reply via email to