Jim – that’s exactly what I did on that “pre” step – generate a schema from the 
CSVReader and use that to dynamically create the DDL sql needed to build the 
staging table in Postgres.  In my solution, there are 2 separate pipelines 
running – this pre step and the normal file processing.

 

I used the pre step to ensure that all incoming files were from a known and 
valid source and that they conformed to the schema for that source – a very 
tidy way to ensure data quality.

 

Mike

 

From: James McMahon <[email protected]> 
Sent: Thursday, April 06, 2023 6:39 AM
To: [email protected]
Subject: Re: Handling CSVs dynamically with NiFi

 

Thank you both very much, Bryan and Mike. Mike, had you considered the approach 
mentioned by Bryan - a Reader processor to infer schema  -  and found it wasn't 
suitable for your use case, for some reason? For instance, perhaps you were 
employing a version of Apache NiFi that did not afford access to a CsvReader or 
InferAvroSchema processor?

Jim

 

On Thu, Apr 6, 2023 at 9:30 AM Mike Sofen <[email protected] 
<mailto:[email protected]> > wrote:

Hi James,

 

I don’t have time to go into details, but I had nearly the same scenario and 
solved it by using Nifi as the file processing piece only, sending valid CSV 
files (valid as in CSV formatting) and leveraged Postgres to land the CSV data 
into pre-built staging tables and from there did content validations and 
packaging into jsonb for storage into a single target table.  

 

In my case, an external file source had to “register” a single file (to allow 
creating the matching staging table) prior to sending data.  I used Nifi for 
that pre-staging step to derive the schema for the staging table for a file and 
I used a complex stored procedure to handle a massive amount of logic around 
the contents of a file when processing the actual files prior to storing into 
the destination table.

 

Nifi was VERY fast and efficient in this, as was Postgres.

 

Mike Sofen

 

From: James McMahon <[email protected] <mailto:[email protected]> > 
Sent: Thursday, April 06, 2023 4:35 AM
To: users <[email protected] <mailto:[email protected]> >
Subject: Handling CSVs dynamically with NiFi

 

We have a task requiring that we transform incoming CSV files to JSON. The CSVs 
vary in schema.

 

There are a number of interesting flow examples out there illustrating how one 
can set up a flow to handle the case where the CSV schema is well known and 
fixed, but none for the generalized case.

 

The structure of the incoming CSV files will not be known in advance in our use 
case. Our nifi flow must be generalized because I cannot configure and rely on 
a service that defines a specific fixed Avro schema registry. An Avro schema 
registry seems to presume an awareness of the CSV structure in advance. We 
don't have that luxury in this use case, with CSVs arriving from many different 
providers and so characterized by schemas that are unknown.

 

What is the best way to get around this challenge? Does anyone know of an 
example where NiFi builds the schema on the fly as CSVs arrive for processing, 
dynamically defining the Avro schema for the CSV?

 

Thanks in advance for any thoughts.

Reply via email to