James, I'm not sure what the end goal is, but why do you need to use EvaluateJsonPath and SplitJson?
Generally you don't want to split a flow file of multiple records into 1 record per flow file, this is an anti-pattern that leads to poor performance in the flow. Thanks, Bryan On Fri, Apr 7, 2023 at 9:41 AM James McMahon <[email protected]> wrote: > > Very interesting, very helpful insights. Thank you again, Mike. > Late last night I decided to punt on a pure NiFi solution. I knew I could do > this easily with Groovy scripting, and I knew that was well-within my > wheelhouse. So that's what I did: Groovy from an ExecuteScript processor. I'm > 90% of the way there. Just a few more refinements to get just what I want, > which I'll tackle later tonight. > Groovy is pretty cool. Flexible, easily tailored to just what you need. I > like having that flexibility. And I like having options, too: your results > have motivated me to look at using QueryRecords, etc etc. > Jim > > On Fri, Apr 7, 2023 at 9:32 AM Mike Sofen <[email protected]> wrote: >> >> This is where I felt Nifi wasn’t the right tool for the job and Postgres >> was. After I imported the CSV directly into a staging table in the database >> (using Nifi), I converted the payload part of the columns into jsonb and >> stored that into the final table in a column with additional columns as >> relational data (timestamps, identifiers, etc). It was an object-relational >> data model. >> >> >> >> THEN, using the amazingly powerful Postgres jsonb functions, I was able to >> extract the unique keys in an entire dataset or across multiple datasets (to >> build a data catalog for example), perform a wide range of validations on >> individual keys, etc. I use the word amazing because they are not just >> powerful functions but they run surprisingly fast given the amount of string >> data they are traversing. >> >> >> >> Mike Sofen >> >> >> >> From: James McMahon <[email protected]> >> Sent: Thursday, April 06, 2023 2:03 PM >> To: [email protected] >> Subject: Re: Handling CSVs dynamically with NiFi >> >> >> >> Can I ask you one follow-up? I've gotten my ConvertRecord to work. I created >> a CsvReader service with Schema Access Strategy of Use String Fields From >> Header. I created a JsonRecordSetWriter service with Schema Write Strategy >> of Do Not Write Schema. >> >> When ConvertRecord is finished, my result looks like this sample: >> >> [ { >> "Bank Name�" : "Almena State Bank", >> "City�" : "Almena", >> "State�" : "KS", >> "Cert�" : "15426", >> "Acquiring Institution�" : "Equity Bank", >> "Closing Date�" : "23-Oct-20", >> "Fund" : "10538" >> }, { >> "Bank Name�" : "First City Bank of Florida", >> "City�" : "Fort Walton Beach", >> "State�" : "FL", >> "Cert�" : "16748", >> "Acquiring Institution�" : "United Fidelity Bank, fsb", >> "Closing Date�" : "16-Oct-20", >> "Fund" : "10537" >> }, { >> "Bank Name�" : "The First State Bank", >> "City�" : "Barboursville", >> "State�" : "WV", >> "Cert�" : "14361", >> "Acquiring Institution�" : "MVB Bank, Inc.", >> "Closing Date�" : "3-Apr-20", >> "Fund" : "10536" >> }] >> >> >> >> I don't really have a schema. How can I use a combination of SplitJson and >> EvaluateJsonPath to split each json object out to its own nifi flowfile, and >> to pull the json key values out to define the fields in the csv header? I've >> found a few examples through research that allude to this, but they all seem >> to have a fixed schema and they don't offer configurations for the >> SplitJson. In a case where my json keys definition changes depending on the >> lfowfile, what should JsonPathExpression be set to in the SplitJson >> configuration? >> >> >> >> On Thu, Apr 6, 2023 at 9:59 AM Mike Sofen <[email protected]> wrote: >> >> Jim – that’s exactly what I did on that “pre” step – generate a schema from >> the CSVReader and use that to dynamically create the DDL sql needed to build >> the staging table in Postgres. In my solution, there are 2 separate >> pipelines running – this pre step and the normal file processing. >> >> >> >> I used the pre step to ensure that all incoming files were from a known and >> valid source and that they conformed to the schema for that source – a very >> tidy way to ensure data quality. >> >> >> >> Mike >> >> >> >> From: James McMahon <[email protected]> >> Sent: Thursday, April 06, 2023 6:39 AM >> To: [email protected] >> Subject: Re: Handling CSVs dynamically with NiFi >> >> >> >> Thank you both very much, Bryan and Mike. Mike, had you considered the >> approach mentioned by Bryan - a Reader processor to infer schema - and >> found it wasn't suitable for your use case, for some reason? For instance, >> perhaps you were employing a version of Apache NiFi that did not afford >> access to a CsvReader or InferAvroSchema processor? >> >> Jim >> >> >> >> On Thu, Apr 6, 2023 at 9:30 AM Mike Sofen <[email protected]> wrote: >> >> Hi James, >> >> >> >> I don’t have time to go into details, but I had nearly the same scenario and >> solved it by using Nifi as the file processing piece only, sending valid CSV >> files (valid as in CSV formatting) and leveraged Postgres to land the CSV >> data into pre-built staging tables and from there did content validations >> and packaging into jsonb for storage into a single target table. >> >> >> >> In my case, an external file source had to “register” a single file (to >> allow creating the matching staging table) prior to sending data. I used >> Nifi for that pre-staging step to derive the schema for the staging table >> for a file and I used a complex stored procedure to handle a massive amount >> of logic around the contents of a file when processing the actual files >> prior to storing into the destination table. >> >> >> >> Nifi was VERY fast and efficient in this, as was Postgres. >> >> >> >> Mike Sofen >> >> >> >> From: James McMahon <[email protected]> >> Sent: Thursday, April 06, 2023 4:35 AM >> To: users <[email protected]> >> Subject: Handling CSVs dynamically with NiFi >> >> >> >> We have a task requiring that we transform incoming CSV files to JSON. The >> CSVs vary in schema. >> >> >> >> There are a number of interesting flow examples out there illustrating how >> one can set up a flow to handle the case where the CSV schema is well known >> and fixed, but none for the generalized case. >> >> >> >> The structure of the incoming CSV files will not be known in advance in our >> use case. Our nifi flow must be generalized because I cannot configure and >> rely on a service that defines a specific fixed Avro schema registry. An >> Avro schema registry seems to presume an awareness of the CSV structure in >> advance. We don't have that luxury in this use case, with CSVs arriving from >> many different providers and so characterized by schemas that are unknown. >> >> >> >> What is the best way to get around this challenge? Does anyone know of an >> example where NiFi builds the schema on the fly as CSVs arrive for >> processing, dynamically defining the Avro schema for the CSV? >> >> >> >> Thanks in advance for any thoughts.
