This is where I felt Nifi wasn’t the right tool for the job and Postgres was. After I imported the CSV directly into a staging table in the database (using Nifi), I converted the payload part of the columns into jsonb and stored that into the final table in a column with additional columns as relational data (timestamps, identifiers, etc). It was an object-relational data model.
THEN, using the amazingly powerful Postgres jsonb functions, I was able to extract the unique keys in an entire dataset or across multiple datasets (to build a data catalog for example), perform a wide range of validations on individual keys, etc. I use the word amazing because they are not just powerful functions but they run surprisingly fast given the amount of string data they are traversing. Mike Sofen From: James McMahon <jsmcmah...@gmail.com> Sent: Thursday, April 06, 2023 2:03 PM To: users@nifi.apache.org Subject: Re: Handling CSVs dynamically with NiFi Can I ask you one follow-up? I've gotten my ConvertRecord to work. I created a CsvReader service with Schema Access Strategy of Use String Fields From Header. I created a JsonRecordSetWriter service with Schema Write Strategy of Do Not Write Schema. When ConvertRecord is finished, my result looks like this sample: [ { "Bank Name�" : "Almena State Bank", "City�" : "Almena", "State�" : "KS", "Cert�" : "15426", "Acquiring Institution�" : "Equity Bank", "Closing Date�" : "23-Oct-20", "Fund" : "10538" }, { "Bank Name�" : "First City Bank of Florida", "City�" : "Fort Walton Beach", "State�" : "FL", "Cert�" : "16748", "Acquiring Institution�" : "United Fidelity Bank, fsb", "Closing Date�" : "16-Oct-20", "Fund" : "10537" }, { "Bank Name�" : "The First State Bank", "City�" : "Barboursville", "State�" : "WV", "Cert�" : "14361", "Acquiring Institution�" : "MVB Bank, Inc.", "Closing Date�" : "3-Apr-20", "Fund" : "10536" }] I don't really have a schema. How can I use a combination of SplitJson and EvaluateJsonPath to split each json object out to its own nifi flowfile, and to pull the json key values out to define the fields in the csv header? I've found a few examples through research that allude to this, but they all seem to have a fixed schema and they don't offer configurations for the SplitJson. In a case where my json keys definition changes depending on the lfowfile, what should JsonPathExpression be set to in the SplitJson configuration? On Thu, Apr 6, 2023 at 9:59 AM Mike Sofen <mso...@runbox.com <mailto:mso...@runbox.com> > wrote: Jim – that’s exactly what I did on that “pre” step – generate a schema from the CSVReader and use that to dynamically create the DDL sql needed to build the staging table in Postgres. In my solution, there are 2 separate pipelines running – this pre step and the normal file processing. I used the pre step to ensure that all incoming files were from a known and valid source and that they conformed to the schema for that source – a very tidy way to ensure data quality. Mike From: James McMahon <jsmcmah...@gmail.com <mailto:jsmcmah...@gmail.com> > Sent: Thursday, April 06, 2023 6:39 AM To: users@nifi.apache.org <mailto:users@nifi.apache.org> Subject: Re: Handling CSVs dynamically with NiFi Thank you both very much, Bryan and Mike. Mike, had you considered the approach mentioned by Bryan - a Reader processor to infer schema - and found it wasn't suitable for your use case, for some reason? For instance, perhaps you were employing a version of Apache NiFi that did not afford access to a CsvReader or InferAvroSchema processor? Jim On Thu, Apr 6, 2023 at 9:30 AM Mike Sofen <mso...@runbox.com <mailto:mso...@runbox.com> > wrote: Hi James, I don’t have time to go into details, but I had nearly the same scenario and solved it by using Nifi as the file processing piece only, sending valid CSV files (valid as in CSV formatting) and leveraged Postgres to land the CSV data into pre-built staging tables and from there did content validations and packaging into jsonb for storage into a single target table. In my case, an external file source had to “register” a single file (to allow creating the matching staging table) prior to sending data. I used Nifi for that pre-staging step to derive the schema for the staging table for a file and I used a complex stored procedure to handle a massive amount of logic around the contents of a file when processing the actual files prior to storing into the destination table. Nifi was VERY fast and efficient in this, as was Postgres. Mike Sofen From: James McMahon <jsmcmah...@gmail.com <mailto:jsmcmah...@gmail.com> > Sent: Thursday, April 06, 2023 4:35 AM To: users <users@nifi.apache.org <mailto:users@nifi.apache.org> > Subject: Handling CSVs dynamically with NiFi We have a task requiring that we transform incoming CSV files to JSON. The CSVs vary in schema. There are a number of interesting flow examples out there illustrating how one can set up a flow to handle the case where the CSV schema is well known and fixed, but none for the generalized case. The structure of the incoming CSV files will not be known in advance in our use case. Our nifi flow must be generalized because I cannot configure and rely on a service that defines a specific fixed Avro schema registry. An Avro schema registry seems to presume an awareness of the CSV structure in advance. We don't have that luxury in this use case, with CSVs arriving from many different providers and so characterized by schemas that are unknown. What is the best way to get around this challenge? Does anyone know of an example where NiFi builds the schema on the fly as CSVs arrive for processing, dynamically defining the Avro schema for the CSV? Thanks in advance for any thoughts.