As Bryan said knowing some more about your setup and expectations would help.

But I can give some information from my own evaluation of a similar setup.
I needed a new data pipeline into a data lake, the setup of the downstream 
systems were like this.

Downstream components:

  *   Multiple Kafka Topics (8+)
     *   Topics have 4+ partitions.
     *   Every topic can hold multiple message types (different schemas)
     *   Only Avro bare records are stored in the topic, schema available in 
registry.

My requirement for going ahead with nifi:

  *   I want to consume messages at a rate of at least 30,000 messages per 
second.
  *   I want no lost messages in transition during normal operations

Results:
The fastest way to consume messages after evaluation resulted in this data flow 
for ingress.

  1.  ConsumeKafka
  2.  ConvertRecord
  3.  Update Attribute
  4.  MergeContent
  5.  MergeContent

Step - ConsumeKafka:
Using ConsumeKafka is the fastest way to process records from a Kafka Topic.
I also told the downstream systems to add Kafka Headers to the bare records so 
I could easily know where the message belong and timestamps.
ConsumeKafkaRecord do a lot of serialization as Bryan mentioned affecting 
performance.

Step - ConvertRecord:
ConvertRecord uses the same serialization as ConsumeKafkaRecord but in this 
step you can just throw threads at the process to increase throughput.
I allocate 8-16 threads to the ConvertRecord processors while I only got 1 
thread running on ConsumeKafka.
Here I convert my bare Avro records to Avro messages (including schema)

Step - Update Attribute:
Running with ConsumeKafka and ConvertRecord generate ALOT of flowfiles and 
these need to be compacted in the end. Using UpdateAttribute to create a 
composite key based on Kafka headers (messagetype+date).

Step - MergeContent:
Merging all the incoming flowfiles based on composite-key (messagetype+date) 
using Avro merge strategy
Here the flowfiles are compressed into bigger files with more records for easy 
management.

Step - MergeContent:
Recommendations is to have multiple merge steps instead of one big one.


Performance Optimizations:
https://community.hortonworks.com/articles/7882/hdfnifi-best-practices-for-setting-up-a-high-perfo.html
The biggest bottleneck is the Provenance repository in this setup so running it 
as volatile is good if linage is not needed.

End results:

Throughput: 20-90k messages per second based on message complexity.
Running on 2 servers with 4 nifi instances:

  *   SSD raid for flowfile repository
  *   6 SAS drives for content repository
  *   48 cores
  *   200GB of RAM.

________________________________
From: Bryan Bende <[email protected]>
Sent: Wednesday, December 12, 2018 2:33 PM
To: [email protected]
Subject: Re: How to consume avro messages with schema reference from Kafka, 
into large flowfiles

>From your original email, option 1 is the correct approach. You are
right that it is performing extra deserialization/serialization, but
this is necessary to deal with the encoded schema references which
really are modified Avro records.

In option 2, if you take a whole bunch of records where the content is
"schema ref + bare avro" and then you merge them together one after
another, there is nothing that understands how to read this, it is not
valid Avro, and there are no readers that expect multiple messages
like this in a single flow file, that is why PutParquet can't read it.

In order to understand the slow down we will need more info...

- What version of Kafka broker are you using?
- Are you using the corresponding version of the record processor?
(i.e. if broker is 1.0.0 then should use ConsumeKafkaRecord_1_0_0)
- How many partitions does your topic have?
- How many nodes in your NiFi cluster?
- How many concurrent tasks configured for ConsumeKafkaRecord?
- What is the record batch size for ConsumeKafkaRecord?

On Wed, Dec 12, 2018 at 5:07 AM Krzysztof Zarzycki <[email protected]> wrote:
>
> Hello,
> I just pull the thread up, if someone knows how to make the avro messages 
> consumption faster, I would be grateful.
> Some more info: When we switched from ConsumeKafka with jsons to 
> ConsumeKafkaRecord with avro messages, we experienced a serious slowdown 
> (mutliple X) . I can get more data what slowdown precisely, but my question 
> about ConsumeKafka/MergeContent based flow becomes even more relevant to me.
> Or maybe I'm doing something wrong, that ConsumeKafkaRecord is so slower?
>
> BTW, I'm on Nifi 1.7.1.
>
> Thank you,
> Krzysztof Zarzycki
>
>
> pt., 7 gru 2018 o 22:24 Krzysztof Zarzycki <[email protected]> napisaƂ(a):
>>
>> Hi everyone,
>> I think I have quite a standard problem and maybe the answer would be quick, 
>> but I can't find it on the internet.
>> We have avro messages in Kafka topic, written with HWX schema reference. 
>> We're able to read them in with e.g. ConsumeKafkaRecord with Avro reader.
>>
>> Now we would like to merge smaller flowfiles to larger files, because we 
>> load these files to HDFS. What combination of processors should we use to 
>> get this with the highest performance?
>> Option 1: ConsumeKafkaRecord with AvroReader and AvroRecordSetWriter, then 
>> MergeRecord with AvroReader/AvroRecordSetWriter. It works, it seems straight 
>> forward, but for me it looks like there is too many interpretations and 
>> rewrites of records. Each records interpretation is an unnecessary cost of 
>> deserialization and then serialization through java heap.
>>
>> Option 2: somehow configure ConsumeKafka and MergeContent to do this? We 
>> used this combination for simple jsons (with binary concatenation), but we 
>> can't get it right with avro messages with schema reference (PutParquet 
>> processor can't read merged files with AvroReader). On the other side, this 
>> should be the fastest as there is no data interpretation, just byte to byte 
>> rewrite. Maybe we just haven't tried some of the configuration combination?
>>
>> Maybe Other options?
>>
>> Thank you for an advice.
>> Krzysztof

Reply via email to