James,

I'm not sure what the end goal is, but why do you need to use
EvaluateJsonPath and SplitJson?

Generally you don't want to split a flow file of multiple records into
1 record per flow file, this is an anti-pattern that leads to poor
performance in the flow.

Thanks,

Bryan

On Fri, Apr 7, 2023 at 9:41 AM James McMahon <[email protected]> wrote:
>
> Very interesting, very helpful insights. Thank you again, Mike.
> Late last night I decided to punt on a pure NiFi solution. I knew I could do 
> this easily with Groovy scripting, and I knew that was well-within my 
> wheelhouse. So that's what I did: Groovy from an ExecuteScript processor. I'm 
> 90% of the way there. Just a few more refinements to get just what I want, 
> which I'll tackle later tonight.
> Groovy is pretty cool. Flexible, easily tailored to just what you need. I 
> like having that flexibility. And I like having options, too: your results 
> have motivated me to look at using QueryRecords, etc etc.
> Jim
>
> On Fri, Apr 7, 2023 at 9:32 AM Mike Sofen <[email protected]> wrote:
>>
>> This is where I felt Nifi wasn’t the right tool for the job and Postgres 
>> was.  After I imported the CSV directly into a staging table in the database 
>> (using Nifi), I converted the payload part of the columns into jsonb and 
>> stored that into the final table in a column with additional columns as 
>> relational data (timestamps, identifiers, etc).  It was an object-relational 
>> data model.
>>
>>
>>
>> THEN, using the amazingly powerful Postgres jsonb functions, I was able to 
>> extract the unique keys in an entire dataset or across multiple datasets (to 
>> build a data catalog for example), perform a wide range of validations on 
>> individual keys, etc.  I use the word amazing because they are not just 
>> powerful functions but they run surprisingly fast given the amount of string 
>> data they are traversing.
>>
>>
>>
>> Mike Sofen
>>
>>
>>
>> From: James McMahon <[email protected]>
>> Sent: Thursday, April 06, 2023 2:03 PM
>> To: [email protected]
>> Subject: Re: Handling CSVs dynamically with NiFi
>>
>>
>>
>> Can I ask you one follow-up? I've gotten my ConvertRecord to work. I created 
>> a CsvReader service with Schema Access Strategy of Use String Fields From 
>> Header. I created a JsonRecordSetWriter service with Schema Write Strategy 
>> of Do Not Write Schema.
>>
>> When ConvertRecord is finished, my result looks like this sample:
>>
>> [ {
>>   "Bank Name�" : "Almena State Bank",
>>   "City�" : "Almena",
>>   "State�" : "KS",
>>   "Cert�" : "15426",
>>   "Acquiring Institution�" : "Equity Bank",
>>   "Closing Date�" : "23-Oct-20",
>>   "Fund" : "10538"
>> }, {
>>   "Bank Name�" : "First City Bank of Florida",
>>   "City�" : "Fort Walton Beach",
>>   "State�" : "FL",
>>   "Cert�" : "16748",
>>   "Acquiring Institution�" : "United Fidelity Bank, fsb",
>>   "Closing Date�" : "16-Oct-20",
>>   "Fund" : "10537"
>> }, {
>>   "Bank Name�" : "The First State Bank",
>>   "City�" : "Barboursville",
>>   "State�" : "WV",
>>   "Cert�" : "14361",
>>   "Acquiring Institution�" : "MVB Bank, Inc.",
>>   "Closing Date�" : "3-Apr-20",
>>   "Fund" : "10536"
>> }]
>>
>>
>>
>> I don't really have a schema. How can I use a combination of SplitJson and 
>> EvaluateJsonPath to split each json object out to its own nifi flowfile, and 
>> to pull the json key values out to define the fields in the csv header? I've 
>> found a few examples through research that allude to this, but they all seem 
>> to have a fixed schema and they don't offer configurations for the 
>> SplitJson. In a case where my json keys definition changes depending on the 
>> lfowfile, what should JsonPathExpression be set to in the SplitJson 
>> configuration?
>>
>>
>>
>> On Thu, Apr 6, 2023 at 9:59 AM Mike Sofen <[email protected]> wrote:
>>
>> Jim – that’s exactly what I did on that “pre” step – generate a schema from 
>> the CSVReader and use that to dynamically create the DDL sql needed to build 
>> the staging table in Postgres.  In my solution, there are 2 separate 
>> pipelines running – this pre step and the normal file processing.
>>
>>
>>
>> I used the pre step to ensure that all incoming files were from a known and 
>> valid source and that they conformed to the schema for that source – a very 
>> tidy way to ensure data quality.
>>
>>
>>
>> Mike
>>
>>
>>
>> From: James McMahon <[email protected]>
>> Sent: Thursday, April 06, 2023 6:39 AM
>> To: [email protected]
>> Subject: Re: Handling CSVs dynamically with NiFi
>>
>>
>>
>> Thank you both very much, Bryan and Mike. Mike, had you considered the 
>> approach mentioned by Bryan - a Reader processor to infer schema  -  and 
>> found it wasn't suitable for your use case, for some reason? For instance, 
>> perhaps you were employing a version of Apache NiFi that did not afford 
>> access to a CsvReader or InferAvroSchema processor?
>>
>> Jim
>>
>>
>>
>> On Thu, Apr 6, 2023 at 9:30 AM Mike Sofen <[email protected]> wrote:
>>
>> Hi James,
>>
>>
>>
>> I don’t have time to go into details, but I had nearly the same scenario and 
>> solved it by using Nifi as the file processing piece only, sending valid CSV 
>> files (valid as in CSV formatting) and leveraged Postgres to land the CSV 
>> data into pre-built staging tables and from there did content validations 
>> and packaging into jsonb for storage into a single target table.
>>
>>
>>
>> In my case, an external file source had to “register” a single file (to 
>> allow creating the matching staging table) prior to sending data.  I used 
>> Nifi for that pre-staging step to derive the schema for the staging table 
>> for a file and I used a complex stored procedure to handle a massive amount 
>> of logic around the contents of a file when processing the actual files 
>> prior to storing into the destination table.
>>
>>
>>
>> Nifi was VERY fast and efficient in this, as was Postgres.
>>
>>
>>
>> Mike Sofen
>>
>>
>>
>> From: James McMahon <[email protected]>
>> Sent: Thursday, April 06, 2023 4:35 AM
>> To: users <[email protected]>
>> Subject: Handling CSVs dynamically with NiFi
>>
>>
>>
>> We have a task requiring that we transform incoming CSV files to JSON. The 
>> CSVs vary in schema.
>>
>>
>>
>> There are a number of interesting flow examples out there illustrating how 
>> one can set up a flow to handle the case where the CSV schema is well known 
>> and fixed, but none for the generalized case.
>>
>>
>>
>> The structure of the incoming CSV files will not be known in advance in our 
>> use case. Our nifi flow must be generalized because I cannot configure and 
>> rely on a service that defines a specific fixed Avro schema registry. An 
>> Avro schema registry seems to presume an awareness of the CSV structure in 
>> advance. We don't have that luxury in this use case, with CSVs arriving from 
>> many different providers and so characterized by schemas that are unknown.
>>
>>
>>
>> What is the best way to get around this challenge? Does anyone know of an 
>> example where NiFi builds the schema on the fly as CSVs arrive for 
>> processing, dynamically defining the Avro schema for the CSV?
>>
>>
>>
>> Thanks in advance for any thoughts.

Reply via email to