Nathan, Hopefully, this finds your issues resolved. But if not, I do have another direction to investigate. In my experience, it's not enough to just move your content repo to SSD. Generally, I only see a gain if both the provenance repo and the content repo are on the same speed disk. But I might be biased as my flows take in high volumes and have a high number of provenance inserts to process. You can test this (if your comfortable not recording your provenance for a while), by switching the implementation method of your provenance repository to volatile (see here for more details - https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#provenance-repository). It basically just keeps a set number of records in memory (see here on how to configure - https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#volatile-provenance-repository-properties), which removes any potential disk bottlenecks for the prov repo. I wouldn't want to run a production cluster like this and keep in mind it will impact your heap space (especially if you make the number of buffered records high), but it would be a starting point to determine if you're bottleneck is related to disk IO for the provenance repository storage.
v/r, Eric O'Reilly On Fri, Sep 25, 2020 at 7:10 AM <nathan.engl...@bt.com> wrote: > Hi Matt, > > > > We’ve now switched to the JoltTransformRecord processors. It does seem to > be slightly better performing. > > > > We are trying to switch out the ConsumeKafka_0_10 and ConvertRecord > processors to the ConsumeKafkaRecord_0_10 processor based on feedback in > this chain as well. > > > > With the ConvertRecord processor, we used the kafka.topic attribute as it > was available because the ConsumeKafka processor had completed on the flow > file. The ConsumeKafkaRecord handles it all in one. When testing, it failed > to parse as kafka.topic is not set. > > > > As we are consuming from multiple topics each with a unique Avro schema > the only way I can see to do this would be to have a separate processor and > AvroReader controller service for each topic? > > > > Kind Regards, > > > > Nathan > > > > > > *From:* Matt Burgess [mailto:mattyb...@apache.org] > *Sent:* 24 September 2020 17:08 > *To:* users@nifi.apache.org > *Subject:* Re: NiFi V1.9.2 Performance > > > > Nathan, > > > > If you have multiple JSON messages in one flow file, is it in one large > array, or a top-level JSON object with an array inside? Also are you trying > to transform each message or the whole thing (i.e. do you need to know > about more than one message at a time)? If you have a top-level array and > are transforming each element in the array, you might get better > performance out of JoltTransformRecord rather than JoltTransformJSON, as > the latter reads the entire file into memory as a single JSON entity. If > you have a top-level object then they will both read the whole thing in. > > > > Regards, > > Matt > > > > > > On Thu, Sep 24, 2020 at 10:25 AM <nathan.engl...@bt.com> wrote: > > Hi Mark, > > > > From what I can see (based on queues building before the processor and > basically to empty after) it is the Jolt processor we have problems with. > We’ve have tried to add more concurrency, reduce the run schedule and > increasing the duration, but it didn’t seem to resolve the high CPU load > (~32 when processing at the rates described in my first email, when no > traffic is processing it sits at 0.2). > > > > It could be the completely wrong way of diagnosing this! I’ve struggled to > find information (Apart from your great videos) to assist in getting to the > bottom of it. > > > > Kind Regards, > > > > Nathan > > > > *From:* Mark Payne [mailto:marka...@hotmail.com] > *Sent:* 24 September 2020 15:12 > *To:* users@nifi.apache.org > *Subject:* Re: NiFi V1.9.2 Performance > > > > Hey Nathan, > > > > A quick clarification - is the bottleneck / the slow point in your flow > actually consuming from Kafka or Jolt? From your original message it sounds > like the bottleneck may actually be the Jolt Transform processor? > > > > If the problem is in the ConsumeKafka processor, one thing you’ll want to > look at is in the Settings tab, set the Yield Duration to “0 sec”. That can > make a huge difference in performance from Kafka processors. > > > > Thanks > > -Mark > > > > > > On Sep 24, 2020, at 10:07 AM, nathan.engl...@bt.com wrote: > > > > Hi Bryan, > > > > Thanks for this. My understanding of the concurrent tasks was incorrect. I > thought it was across the whole cluster, not per node. > > > > I did spend some time looking at the code for the demarcator as we had > issues getting it batching. I think there may be a slight misunderstanding > between my description and how it sounds. > > > > When I say an Empty string, the message demarcator isn’t blank. I have > used the checkbox ‘Set Empty String’, which means the processor treats the > field as Null (From memory). If I left the field empty (checkbox not > selected), it was one Kafka message to one flow file, which was a massive > bottleneck. > > > > I also seem to remember from when I looked at the code. The > ConsumeKafkaRecord processors defaults the demarcator to null. > > > > Kind Regards, > > > > Nathan > > > > *From:* Bryan Bende [mailto:bbe...@gmail.com <bbe...@gmail.com>] > *Sent:* 24 September 2020 14:54 > *To:* users@nifi.apache.org > *Subject:* Re: NiFi V1.9.2 Performance > > > > Regarding the batching, I would have to double check the code, but since > you said the demarcator is empty string, I think that means it is not > batching and putting one message to one flow file. Basically if a > demarcator is not set then batch size is ignored. > > > > Regarding the processors/tasks, lets take one topic with 11 partitions as > an example, if you make a consumer processor for this topic with 1 > concurrent task, then you have 3 instances of this processor since you have > a 3 node cluster, so you might end up with something like this... > > > > node 1 - ConsumeKafka > > Task 1 - 4 partitions > > > > node 2 - ConsumeKafka > > Task 1 - 4 partitions > > > > node 3 - ConsumeKafka > > Task 1 - 3 partitions > > > > It may not be exactly like that, but just an example as to how it should > be assigned. > > > > To add more parallelism you could then increase concurrent tasks up to > maybe 4 and you get something like this... > > > > node 1 - ConsumeKafka > > Task 1 - 1 partition > > Task 2 - 1 partition > > Task 3 - 1 partition > > Task 4 - 1 partition > > > > node 2 - ConsumeKafka > > Task 1 - 1 partition > > Task 2 - 1 partition > > Task 3 - 1 partition > > Task 4 - 1 partition > > > > node 3 - ConsumeKafka > > Task 1 - 1 partition > > Task 2 - 1 partition > > Task 3 - 1 partition > > Task 4 - nothing > > > > If you go higher than 4 concurrent tasks you will just end up creating > more consumers than partitions, and there is nothing to assign them. > > > > > > On Thu, Sep 24, 2020 at 9:30 AM <nathan.engl...@bt.com> wrote: > > Hi Bryan, > > > > We have configured the processor to read in a maximum batch size of 2k > messages, which does seem to have one than more Kafka message in the flow > file. > > > > Completely understand on the Load balancing, we tried several iterations > of 1 task to one topic partition. However, we still found it to be loaded > towards one specific node. I will try splitting it into multiple processors > to see if this handles it any better. We have 10 topics with 11 partitions. > (one topic with 2 partitions). So I should set all concurrent tasks to 1 > with multiple processors (One processor per topic)? > > > > Kind Regards, > > > > Nathan > > *From:* Bryan Bende [mailto:bbe...@gmail.com] > *Sent:* 24 September 2020 13:59 > *To:* users@nifi.apache.org > *Subject:* Re: NiFi V1.9.2 Performance > > > > I'm not sure how many topics you have, but the biggest improvement would > probably be to not do a single message per flow file. You want to batch > together lots of messages coming out of ConsumeKafka using a demarcator, > and then convert/transform them in batches. You may need to have a separate > consumer processor for each topic in order to do this correctly. > > > > You generally shouldn't need to use a load balanced connection after > ConsumeKafka. If one node is being favored it is likely that the number of > partitions in your topic is not lining up with the # of nifi nodes X # of > concurrent consumer tasks. In the simplest case, if your topic had one > partition, and you have 3 nifi nodes with Consumekafka with 1 concurrent > task each, then all your messages will only get consumed on one of the > nodes. If you have 3 partitions then it should be roughly equal. In your > case you have 3 nodes with concurrent tasks set to 12, so you have > potentially 36 consumers, which means if you have anything less than 36 > partitions then it is not going to be balanced equally. > > > > On Thu, Sep 24, 2020 at 3:54 AM <nathan.engl...@bt.com> wrote: > > Hi Joe, > > > > The Raids seem to give us a good IOPS number when we’ve tried testing > them. We have seen a 300ms wait time on the Content Repo, hence why we have > tried SSDs for the content repo as we assumed that was the bottleneck. The > other Raids seemed OK to us. > > > > I will certainly look into the ConsumeKafkaRecord processor today and will > come up with a solution to use it. I will feedback on what I find. > > > > In regards to our current flow configuration, we have the following > high-level process groups: > > · ConsumeData - Consumes the data from Kafka, Add some > additional attributes, Converts to JSON from Avro > > · TransfromData – This is where the majority of the work happens, > it gets routed on an attribute to a set sub-process group based on the type > of data it is (this is decided based on the kafka.topic attribute) where > the processing happens (Explained in more detail below) > > · ProduceData – Publishes the Record back to Kafka using the > PublishKafkaRecord processor, add some attributes to calculate processing > duration it took to process and logs this out. > > · Error Handling – A General Catch all. Luckily we don’t see this > triggered very often, but it logs a message and writes the file to disk. > > <image001.png> > > > > *ConsumeData* process group: > > · ConsumeKafka, as mentioned in earlier emails, consumes the > messages from multiple Kafka Topics with a Message Demarcator of an empty > string and the Max poll Records set at 2,000. > > · The Consumed Flows queue uses RoundRobin load balancing as we > have found one node is favoured for the consumption from Kafka so wanted to > distribute the load. > > · AddAttributes uses the UpdateAttribute to add additional > information to the flow file (Data Type, Output Kafka Topic) based on the > Kafka.topic attribute on the flow file. > > · Finally, Convert Record converts the content of the flow file > from Avro to JSON using the AvroRead and JsonRecordSetWriter Controller > Services. In Additional we use the inbuilt AvroSchemaRegistry. > > <image002.png> > > > > *TransformData* process group. Due to the size, it’s not easy to show in > a screenshot. > > · Flow files are routed based on attribute to 1 of 10 processing > groups (based on the kafka.topic attribute) > > · Each Data Type has its own processing group which is I have > gone into detail on one below > > · There are two failure output ports one for the initial routing > issues, another for any errors that have happened in the sub-processing > groups. There is also the Succesful output port. > > > > <image003.png> > > > > > > *HTTP Data Type Sub-processing group (Inside TransformData):* > > · The sub-processing groups all follow a somewhat similar layout > but have different custom processors after the initial Jolt Transformation > > · The Jolt Transformation does the majority of the heavy lifting > in converting the message into the right format to match the output Avro > schema requirements. The Jolt Spec contains a chain of operations including > modify-overwrite-beta, shift, and default > > · Each processor after the Jolt transformation does something > different to meet a specific requirement not possible with other Nifi > processors. Some like the IPAddressProcessor are reused in processing for > other data types. We are planning to move into a single post-processing > group where all data types are routed, before being published to Kafka. > > <image004.png> > > > > *Produce Data*, produces the messages back to different Kafka topics > which have been defined in the output.topic (set in the consume data stage) > > · PublishKafkaRecord converts the message from JSON back to Avro > (Different schema to input) using the JSONTreeReader and > AvroRecordSetWriter Controller services. We have set it to Guarantee Single > Node Delivery, and use Snappy Compression. > > · Set Exit Attributes adds a Final Processing time to the flow > file > > · The Calculate duration uses this to work out its overall time > processing > > · Then it’s finally logged to file so we can analyse the full > processing time. > > <image005.png> > > > > Below is the scheduling settings we are using for processors: > > *Group* > > *Processor* > > *Concurrent* > > *Tasks* > > *Run* > > *Duration* > > *Yield* > > *Duration* > > *Other* > > Consume Data > > ConsumeKafka > > 12 > > 0 > > 1 mS > > Message Demarcator = "Empty string set" > > Max Poll Records = 2,000 > > AddAttributes > > 10 > > 0 > > 1 S > > ConvertRecord > > 10 > > 0 > > 1 S > > TransformData > > RouteOnAttribute > > 10 > > 0 > > 1 S > > HTTP Jolt > > 5 > > 0 > > 200 mS > > HTTP Post Processor > > 2 > > 0 > > 1 S > > Produce Data > > PublishKafka > > 10 > > 0 > > 1 S > > Max Request Size = 1 MB > > Compression Type = snappy > > batch.size = 80,000 (Based on some custom producers we have written in the > past) > > linger.ms > <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Flinger.ms%2F&data=02%7C01%7Cnathan.english%40bt.com%7Ce1a257a5405a4210d35408d860a41ae1%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365605231282190&sdata=DwONev1IuaRISFcSVpRlIzWMLPumsFwBXWZe7V%2BCHVM%3D&reserved=0> > = > 50,000 (Based on some custom producers we have written in the past) > > Set Exit Attributes > > 10 > > 0 > > 1 S > > Calculate Duration > > 10 > > 0 > > 1 S > > Log Success > > 10 > > 0 > > 1 S > > > > The Queue configs are as follows: > > > > *Queue* > > > *Back Pressure Object Threshold* > > > *Back Pressure Size Threshold* > > *Load Balance* > > After ConsumeKafka > > 20,000 > > 1 GB > > Round Robin > > After Route On Attribute > > All Other Queues In TransformData > > 10,000 > > 1 GB > > No > > All Failure Routes > > 10,000 > > 1 GB > > No > > Between ConsumerData And TransformData > > Between TransformData And ProduceData > > All Other Queues In ConsumeData > > Before Route On Attribute > > All Queues In ProduceData > > 20,000 > > 1 GB > > No > > > > We also have a Maximum Timer Driven Thread Count of 200 set. > > > > Stateless NiFi would fit this perfectly, but from what I have read, it’s > not available in v1.9.2? We are stuck on 1.9.2 as we are using the > Zookeeper provided as part of a Cloudera cluster. However, I’m wondering if > we would be better suited to the volatile repositories, than writing to > disk? > > > > Thanks again for the advice so far Joe, you’ve given us some confidence > it’s us doing something wrong and not something with NiFi. > > > > Kind Regards, > > > > Nathan > > > > *From:* Joe Witt [mailto:joe.w...@gmail.com] > *Sent:* 23 September 2020 19:05 > *To:* users@nifi.apache.org > *Subject:* Re: NiFi V1.9.2 Performance > > > > Nathan > > > > Not sure what read/write rates you'll get in these RAID-10 configs but > generally this seems like it should be fine (100s of MB/sec per node range > at least). Whereas now you're seeing about 20MB/sec/node. This is > definitely very low. > > > > If you review > http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-2-6-nar/1.12.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/index.html > <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2Fnifi.apache.org%2Fdocs%2Fnifi-docs%2Fcomponents%2Forg.apache.nifi%2Fnifi-kafka-2-6-nar%2F1.12.0%2Forg.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6%2Findex.html&data=02%7C01%7Cnathan.english%40bt.com%7Ce1a257a5405a4210d35408d860a41ae1%7Ca7f356889c004d5eba4129f146377ab0%7C0%7C0%7C637365605231282190&sdata=M3rQp0EYnUrRvfElTawfttqKloYH3P6sHEZgVc4Fpgw%3D&reserved=0> > then you'll see that we do actually capture attributes such as > kafka.topic and so on. Flowfiles would also be properly grouped by that. > What I'm not positive of is it could handle reading from multiple topics at > the same time while also honoring and determining each of their distinct > schemas. Would need to test/verify that scenario to be sure. If you do > have a bunch of topics and they could grow/change then keeping this single > processor approach makes sense. If you can go the route of one > ConsumeKafkaRecord per topic then obviously that would work well. > > > > Not seeing your flow though I cannot be certain where the bottleneck(s) > exist and provide guidance. But this is without a doubt a vital skill to > achieving maximum performance. > > > > You'd have to show/share a ton more details for folks here to be helpful > in walking through the full design. Or explain the end to end flow. > > > > As an additional food for thought if the flows are indeed 'from kafka -> > do stuff -> back to kafka' this is likely a great use case for > stateless-nifi. > > > > Thanks > > > > On Wed, Sep 23, 2020 at 10:43 AM <nathan.engl...@bt.com> wrote: > > Hi Joe, > > > > Thanks for getting back to me so quickly. > > > > Our disk setup is as follows: > > Path > > Storage Type > > Format > > Capacity > > Content > > / > > 100GB OS SSD > > ext4 > > 89.9GB > > OS, NiFi install, Logs > > /data/1/ > > 2 x 4TB SAS Hard Drives in RAID 1 > > ext4 > > 3.7TB > > Database and Flowfile Repos > > /data/2/ > > 8 x 4TB SAS Hard Drives in RAID 10 > > ext4 > > 14.6TB > > Content Repo > > /data/3/ > > 2 x 4TB SAS Hard Drives in RAID 1 > > ext4 > > 3.7TB > > Provence Repo > > /ssd > > 1 x 4TB PCIe NVMe SSD > > ext4 > > 3.7TB > > Content Repo (Used instead of /data/2/ as a test), to see if CPU was > bottlenecked by Disk operations. > > > > I will certainly take a look at those. One question with the consume > record processor is how I would consume from multiple topics and ensure the > correct Avro schema is used to deserialise the message? We have 1:1 mapping > of schemas to topics. At the moment the ConsumeKafka processor is reading > from all topics in one consumer. I’m assuming the attribute kafka.topic > attribute doesn’t exist at this stage? We use the Avro Schema Registry > Controller as we don’t have a schema registry in place yet. > > > > Kind Regards, > > > > Nathan > > > > *From:* Joe Witt [mailto:joe.w...@gmail.com] > *Sent:* 23 September 2020 17:33 > *To:* users@nifi.apache.org > *Subject:* Re: NiFi V1.9.2 Performance > > > > Nathan > > > > You have plenty powerful machines to hit super high speeds but what I > cannot tell is how the disks are setup/capability and layout wise and > relative to our three repos of importance. You'll need to share those > details. > > > > That said, the design of the flow matters. The Kafka processors that > aren't record oriented will perform poorly unless they're acquiring data in > their natural batches as they arrive from kafka. In short, use the record > oriented processors from Kafka. In it you can even deal with the fact you > want to go from AVRO to Json and so on. These processors have a tougher > learning curve but they perform extremely well and we have powerful > processors to go along with them for common patterns. > > > > You absolutely should be able to get to the big numbers you have seen. It > requires great flow design (powerful machines are secondary). > > > > Thanks > > > > On Wed, Sep 23, 2020 at 9:26 AM <nathan.engl...@bt.com> wrote: > > Hi All, > > > > We’ve got a NiFi 3 Node Cluster running on 3 x 40 CPU, 256GB RAM (32G Java > Heap) servers. However, we have only been able to achieve a consumption of > ~9.48GB Consumption Compressed (38.53GB Uncompressed) over 5 minutes, with > a production rate of ~16.84GB out of the cluster over 5 mins. This is much > lower than we were expecting based on what we have read. With this > throughput we see a CPU load ~32 on all nodes, so we know there isn’t much > else we can get out of the CPU). > > > > We have also tried SSDs, Raided and Unraided HDDs for the content repo > storage, but they haven’t made a difference to the amount we can process. > > > > The process is as follows: > > 1. Our flow reads from Kafka Compressed (Maximum of 2000 records > per file). It then converts them from Avro to JSON. (ConsumeKafka_0_10 à > UpdateAttribute à ConvertRecord) > > 2. Depending on which topic the flow file is consumed from, we then > send the message to one of 10 potential process groups, each containing > between 3 and 5 processors within the process groups. (RouteOnAttribute à > Relevant > Processing Group containing JoltTransformJSON and several custom processors > we have made). > > 3. Finally, we produce the flow file content back to one of several > Kafka topics, based on the input topic name in Avro format with Snappy > compression on the Kafka topic. > > > > Inspecting the queued message counts, it indicates that the Jolt > Transforms are taking the time to process (Large queues before JOLT > processors, small or no queues afterwards). But I’m not sure why this is > any worse than the rest of the processors as the event duration is less > than a second when inspecting in provenance? We have tuned the number of > concurrent tasks, duration and schedules to get the performance we have so > far. > > > > I’m not sure if there is anything anyone could recommend or suggest to try > and make improvements? We need to achieve a rate around 5x of what it’s > currently processing with the same number of nodes. We are running out of > ideas on how to accomplish this and may have to consider alternatives. > > > > Kind Regards, > > > > Nathan > > > >