Hello. I have incoming data that is json. An example of one case looks like
this:

{"id": "20230508215236_4447cd0a-9dca-47cb-90b1-6562cf34155a_Timer-Driven
Process Thread-9",
"te": "0.9494",
"diskusage": "0.2776125422110003.3 MB",
"memory": 77,
"cpu": 0.58,
"host": "172.31.73.197/ip-172-31-73-197.ec2.internal",
"temperature": "97",
"macaddress": "f417ead3-4fa9-4cee-a14b-7172e9ecd3ea",
"end": "61448816405795",
"systemtime": "05/08/2023 16:52:36"}


I wrote a Groovy script running in a nifi ExecuteScript processor to tally
the keys from the json, reporting the results as flowfile attributes. I am
getting this error:

ExecuteScript[id=028b1d40-33a5-1766-b659-31b3faaf13f5] Error
processing json fields: groovy.lang.MissingMethodException: No
signature of method:
Script9$_run_closure1$_closure2$_closure4.doCall() is applicable for
argument types: (Entry) values:
[id=20230508215236_4447cd0a-9dca-47cb-90b1-6562cf34155a_Timer-Driven
Process Thread-9]
Possible solutions: doCall(java.lang.Object, java.lang.Object),
findAll(), findAll(), isCase(java.lang.Object),
isCase(java.lang.Object)



What is causing this error? Am I reading in the data incorrectly using
the JsonSlurper?



Here is my code:

import groovy.json.JsonSlurper
import groovy.json.JsonOutput
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets

def originalFile = ''
def keys = []
def tallyMap = [:]
def topValuesMap = [:]
def lineCount = 0

def ff = session.get()
if (!ff) return
try {
    session.read(ff, { inputStream ->
        def json = new
JsonSlurper().parseText(IOUtils.toString(inputStream,
StandardCharsets.UTF_8))


        def keySet = new HashSet<String>()

        if (json instanceof List) {
            keySet.addAll(((Map)json[0]).keySet())
        } else {
            keySet.addAll(json.keySet())
        }

        keys = keySet.toList()
        originalFile = ff.getAttribute('filename')


        json.each { jsonObj ->
            jsonObj.each { key, value ->
                if (value != null && !value.toString().trim().isEmpty()) {
                    tallyMap[key] = tallyMap.containsKey(key) ?
tallyMap[key] + 1 : 1
                    if (topValuesMap.containsKey(key)) {
                        def valuesMap = topValuesMap[key]
                        valuesMap[value] =
valuesMap.containsKey(value) ? valuesMap[value] + 1 : 1
                        topValuesMap[key] = valuesMap
                    } else {
                        topValuesMap[key] = [:].withDefault{ 0
}.plus([value: 1])
                    }
                    // Remove the "value: 1" entry from the topValuesMap
                    topValuesMap[key].remove("value")
                }
            }
        }

        // Sort the topValuesMap for each key based on the frequency of values
        topValuesMap.each { key, valuesMap ->
            topValuesMap[key] = valuesMap.sort{ -it.value }.take(10)
        }


        // Count the number of JSON records
        lineCount += json.size()
    } as InputStreamCallback)

    def tallyMapString = JsonOutput.toJson(tallyMap)
    def topValuesMapString = JsonOutput.toJson(topValuesMap)

    ff = session.putAttribute(ff, 'triage.json.file', originalFile)
    ff = session.putAttribute(ff, 'triage.json.fields', keys.join(","))
    ff = session.putAttribute(ff, 'triage.json.tallyMap', tallyMapString)
    ff = session.putAttribute(ff, 'triage.json.topValuesMap',
topValuesMapString)
    ff = session.putAttribute(ff, 'triage.json.lineCount', lineCount.toString())
    session.transfer(ff, REL_SUCCESS)

} catch (Exception e) {
    log.error('Error processing json fields', e)
    session.transfer(ff, REL_FAILURE)
}

Reply via email to