I did as you suggested Matt and attacked the problem with Groovy. Since I'm
stronger at manipulating json than csv using Groovy, I first converted the
incoming to json using a ConverRecord processor. I inferred the record
schema from the header assumed in the incoming csv. (still tbd: how I will
handle csv files that lack any header).

If anyone has a similar requirement in the future here is some Groovy code
that gets the job done. It could probably use some tidying up:

import groovy.json.JsonSlurper
import groovy.json.JsonOutput
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets

def tallyMap = [:]
def topValuesMap = [:]
def lineCount = 0

def ff = session.get()
if (!ff) return
try {
    session.read(ff, { inputStream ->
        def json = new
JsonSlurper().parseText(IOUtils.toString(inputStream,
StandardCharsets.UTF_8))

        json.each { jsonObj ->
            jsonObj.each { key, value ->
                if (value != null && !value.toString().trim().isEmpty()) {
                    tallyMap[key] = tallyMap.containsKey(key) ?
tallyMap[key] + 1 : 1
                    if (topValuesMap.containsKey(key)) {
                        def valuesMap = topValuesMap[key]
                        valuesMap[value] = valuesMap.containsKey(value) ?
valuesMap[value] + 1 : 1
                        topValuesMap[key] = valuesMap
                    } else {
                        topValuesMap[key] = [:].withDefault{ 0
}.plus([value: 1])
                    }
                    // Remove the "value: 1" entry from the topValuesMap
                    topValuesMap[key].remove("value")
                }
            }
        }

        // Sort the topValuesMap for each key based on the frequency of
values
        topValuesMap.each { key, valuesMap ->
            topValuesMap[key] = valuesMap.sort{ -it.value }.take(10)
        }


        // Count the number of JSON records
        lineCount += json.size()
    } as InputStreamCallback)

    //def triageResultsString =
JsonOutput.prettyPrint(JsonOutput.toJson(triageResultsMap))

    def tallyMapString = JsonOutput.prettyPrint(JsonOutput.toJson(tallyMap))
    def topValuesMapString =
JsonOutput.prettyPrint(JsonOutput.toJson(topValuesMap))

    ff = session.putAttribute(ff, 'triage.csv.tallyMap', tallyMapString)
    ff = session.putAttribute(ff, 'triage.csv.topValuesMap',
topValuesMapString)
    ff = session.putAttribute(ff, 'triage.csv.lineCount',
lineCount.toString())
    session.transfer(ff, REL_SUCCESS)

} catch (Exception e) {
    log.error('Error processing csv fields', e)
    session.transfer(ff, REL_FAILURE)
}

Incoming CSV looks like this (sample csv data from data.gov):

*Bank Name ,City ,State ,Cert ,Acquiring Institution ,Closing Date ,Fund*
Almena State Bank,Almena,KS,15426,Equity Bank,23-Oct-20,10538
First City Bank of Florida,Fort Walton Beach,FL,16748,"United Fidelity
Bank, fsb",16-Oct-20,10537
The First State Bank,Barboursville,WV,14361,"MVB Bank, Inc.",3-Apr-20,10536
Ericson State Bank,Ericson,NE,18265,Farmers and Merchants
Bank,14-Feb-20,10535
City National Bank of New Jersey,Newark,NJ,21111,Industrial
Bank,1-Nov-19,10534
.......

Here is the field analysis output that results:
triage.csv.lineCount
563
triage.csv.tallyMap
{ "Bank Name\ufffd": 563, "City\ufffd": 563, "State\ufffd": 563,
"Cert\ufffd": 563, "Acquiring Institution\ufffd": 563, "Closing
Date\ufffd": 563, "Fund": 563 }
triage.csv.topValuesMap
{ "Bank Name\ufffd": { "The First State Bank": 3, "Premier Bank": 3, "First
State Bank": 3, "Horizon Bank": 3, "Valley Bank": 2, "Frontier Bank": 2,
"Summit Bank": 2, "The Park Avenue Bank": 2, "Legacy Bank": 2, "First
National Bank": 2 }, "City\ufffd": { "Chicago": 20, "Atlanta": 10,
"Phoenix": 6, "Naples": 5, "Scottsdale": 4, "Las Vegas": 4, "Bradenton": 4,
"Miami": 4, "Los Angeles": 4, "Alpharetta": 4 }, "State\ufffd": { "GA": 93,
"FL": 76, "IL": 69, "CA": 41, "MN": 23, "WA": 19, "AZ": 16, "MO": 16, "MI":
14, "TX": 13 }, "Cert\ufffd": { "16748": 1, "14361": 1, "18265": 1,
"21111": 1, "58317": 1, "58112": 1, "10716": 1, "30570": 1, "17719": 1,
"1802": 1 }, "Acquiring Institution\ufffd": { "No Acquirer": 31, "State
Bank and Trust Company": 12, "First-Citizens Bank & Trust Company": 11,
"Ameris Bank": 10, "U.S. Bank N.A.": 9, "Community & Southern Bank": 8,
"Centennial Bank": 7, "Stearns Bank, N.A.": 7, "Bank of the Ozarks": 7,
"Republic Bank of Chicago": 6 }, "Closing Date\ufffd": { "30-Oct-09": 9,
"20-Aug-10": 8, "16-Apr-10": 8, "22-Oct-10": 7, "23-Jul-10": 7,
"30-Apr-10": 7, "23-Apr-10": 7, "19-Mar-10": 7, "18-Dec-09": 7,
"23-Oct-09": 7 }, "Fund": { "10537": 1, "10536": 1, "10535": 1, "10534": 1,
"10533": 1, "10532": 1, "10531": 1, "10530": 1, "10529": 1, "10528": 1 } }
That funky unicode at the end of each header field name is in the incoming
data.

Hope this helps someone in the future. Thanks again for your replies to my
initial post.
Jim

On Tue, Apr 18, 2023 at 6:38 PM James McMahon <jsmcmah...@gmail.com> wrote:

> Thanks very much for your reply, Matt. Yes sir, a Groovy script is my
> fallback option. Because we would rather build flows using "out of the NiFi
> box" processors instead of custom scripts that may need to be maintained, I
> was saving that as my last resort. But I do believe I can do it with Groovy.
>
> I have a sample CSV data set of bank info I grabbed from data.gov. I've
> successfully used ConvertRecord to convert from csv to json (Groovy maps
> work really well with json). Using Groovy, I've identified my fields:
>
> Bank Name�,City�,Closing Date�,Fund,Acquiring Institution�,Cert�,State�
>  (That weird character at end of each field are in the set. Not sure why.)
> Here are a few initial records in my massaged result:
> [ {
>   "Bank Name�" : "Almena State Bank",
>   "City�" : "Almena",
>   "State�" : "KS",
>   "Cert�" : "15426",
>   "Acquiring Institution�" : "Equity Bank",
>   "Closing Date�" : "23-Oct-20",
>   "Fund" : "10538"
> }, {
>   "Bank Name�" : "First City Bank of Florida",
>   "City�" : "Fort Walton Beach",
>   "State�" : "FL",
>   "Cert�" : "16748",
>   "Acquiring Institution�" : "United Fidelity Bank, fsb",
>   "Closing Date�" : "16-Oct-20",
>   "Fund" : "10537"
> },.........
> I am going to try iterating through that list of fields using Groovy,
> keeping a map with field name as its key and a value that is a map of
> "field value": "count", sorted. I can then extract all sorts of valuable
> metadata.
> In any case thank you for the reply. Guess I'll go the custom Groovy
> script route.
> Jim
>
> On Tue, Apr 18, 2023 at 6:17 PM Matt Burgess <mattyb...@apache.org> wrote:
>
>> Jim,
>>
>> QueryRecord uses Apache Calcite under the hood and is thus at the
>> mercy of the SQL standard (and any additional rules/dialect from
>> Apache Calcite) so in general you can't select "all except X" or "all
>> except change X to Y". Does it need to be SQL executed against the
>> individual fields? If not, take a look at ScriptedTransformRecord doc
>> (and its Additional Details page). IIRC you're a Groovy guy now ;) so
>> you should be able to alter the fields as you see fit using Groovy
>> rather than SQL (alternatively Jython as you've done a bunch of that
>> as well).
>>
>> Regards,
>> Matt
>>
>> On Tue, Apr 18, 2023 at 6:04 PM James McMahon <jsmcmah...@gmail.com>
>> wrote:
>> >
>> > Hello. I recently asked the community a question about processing CSV
>> files. I received some helpful advice about using processors such as
>> ConvertRecord and QueryRecord, and was encouraged to employ Readers and
>> RecordSetWriters. I've done that, and thank all who replied.
>> >
>> > My incoming CSV files come in with different headers because they are
>> widely different data sets. The header structure is not known in advance.
>> As such, I configure a QueryRecord processor with a CSVReader that employs
>> a Schema Access Strategy that is Use String Fields From Header. I configure
>> a CSVRecordSetWriter that sets Infer Record Schema as its Schema Access
>> Strategy.
>> >
>> > Now I want to use that QueryRecord processor to characterize the
>> various fields using SQL. Record counts, min and max values - things of
>> that nature. But in all the examples I find in YouTube and in the open
>> source, the authors presume a knowledge of the fields in advance. For
>> example Property year is set by Value select "year" from FLOWFILE.
>> >
>> > We simply don't have that luxury, that awareness in advance. After all,
>> that's the very reason we inferred the schema in the reader and writer
>> configuration. The fields are more often than not going to be very
>> different. Hard wiring them into QueryRecord is not a flow solution that is
>> flexible enough. We need to grab them from the inferred schema the Reader
>> and Writer services identified.
>> >
>> > What syntax or notation can we use in the QueryRecord sql to say "for
>> each field found in the header, execute this sql against that field"? I
>> guess what I'm looking for is iteration through all the inferred schema
>> fields, and dynamic assignment of the field name in the SQL.
>> >
>> > Has anyone faced this same challenge? How did you solve it?
>> > Is there another way to approach this problem?
>> >
>> > Thank you in advance,
>> > Jim
>>
>

Reply via email to