Part of my problem may be that the web bulletin doesn't catch everything.
The log (retrieved with docker log) seems more complete.

I think I have a lead on the problem, and would appreciate advice on how to
deal with it. The numeric columns that are failing are displaying correctly
when I display formatted data on the web page, but are giving errors when
displaying from groovy - see below for some log file output and the code
producing it:

The thing these fields have in common is that in postgres they are defined
as numeric, whereas blob_id is defined as bigint. The numeric columns are
imported from an oracle DB so the format is more general and perhaps less
optimal than it should be. Any suggestions on whether I can deal with this
in groovy, or do I need to reformat my sources?

               fields.each { field ->
                  def fieldName = field.name()
                  def value = currRecord.get(fieldName)
                  log.error("Field name : ${fieldName} ${value}")
               }

2025-07-21 06:55:39,992 ERROR [Timer-Driven Process Thread-25]
o.a.n.p.groovyx.ExecuteGroovyScript
ExecuteGroovyScript[id=219801af-0198-1000-88b2-b2d88ce64a3a] Field name :
blob_id 165000
2025-07-21 06:55:39,992 ERROR [Timer-Driven Process Thread-25]
o.a.n.p.groovyx.ExecuteGroovyScript
ExecuteGroovyScript[id=219801af-0198-1000-88b2-b2d88ce64a3a] Field name :
event_id java.nio.HeapByteBuffer[pos=0 lim=3 cap=3]
2025-07-21 06:55:39,992 ERROR [Timer-Driven Process Thread-25]
o.a.n.p.groovyx.ExecuteGroovyScript
ExecuteGroovyScript[id=219801af-0198-1000-88b2-b2d88ce64a3a] Field name :
person_id java.nio.HeapByteBuffer[pos=0 lim=4 cap=4]
2025-07-21 06:55:39,992 ERROR [Timer-Driven Process Thread-25]
o.a.n.p.groovyx.ExecuteGroovyScript
ExecuteGroovyScript[id=219801af-0198-1000-88b2-b2d88ce64a3a] Field name :
encntr_id java.nio.HeapByteBuffer[pos=0 lim=4 cap=4]
2025-07-21 06:55:39,992 ERROR [Timer-Driven Process Thread-25]
o.a.n.p.groovyx.ExecuteGroovyScript
ExecuteGroovyScript[id=219801af-0198-1000-88b2-b2d88ce64a3a] Field name :
valid_from_dt_tm 1306278442000
2025-07-21 06:55:39,992 ERROR [Timer-Driven Process Thread-25]
o.a.n.p.groovyx.ExecuteGroovyScript
ExecuteGroovyScript[id=219801af-0198-1000-88b2-b2d88ce64a3a] Field name :
creation_dt_tm 1306278442000
2025-07-21 06:55:39,992 ERROR [Timer-Driven Process Thread-25]
o.a.n.p.groovyx.ExecuteGroovyScript
ExecuteGroovyScript[id=219801af-0198-1000-88b2-b2d88ce64a3a] Field name :
blob_seq_num java.nio.HeapByteBuffer[pos=0 lim=1 cap=1]
2025-07-21 06:55:39,992 ERROR [Timer-Driven Process Thread-25]
o.a.n.p.groovyx.ExecuteGroovyScript
ExecuteGroovyScript[id=219801af-0198-1000-88b2-b2d88ce64a3a] Field name :
blob_length java.nio.HeapByteBuffer[pos=0 lim=2 cap=2]
2025-07-21 06:55:39,992 ERROR [Timer-Driven Process Thread-25]
o.a.n.p.groovyx.ExecuteGroovyScript
ExecuteGroovyScript[id=219801af-0198-1000-88b2-b2d88ce64a3a] Field name :
compression_cd java.nio.HeapByteBuffer[pos=0 lim=2 cap=2]
2025-07-21 06:55:39,992 ERROR [Timer-Driven Process Thread-25]
o.a.n.p.groovyx.ExecuteGroovyScript
ExecuteGroovyScript[id=219801af-0198-1000-88b2-b2d88ce64a3a] Field name :
max_sequence_nbr java.nio.HeapByteBuffer[pos=0 lim=1 cap=1]


On Mon, Jul 21, 2025 at 9:50 AM Richard Beare <[email protected]>
wrote:

> That logging is part of my debug attempt, rather than the original.
> The script is pretty simple - it is checking the compression_cd field (the
> name of which is defined in a property), and depending on the value of this
> field, hands off the content of the blob_content field (name also defined
> in a property) to one of two processing branches (not shown).
>
> The failure with the new data format occurs when attempting to retrieve
> the compression_cd field, with errors about failing to cast null to integer.
>
> I'm attempting to print all the available fields with the code you quoted.
> #############
> For the old data format it produced:
> 23:13:58 BST
> WARNING
> fbad38e4-5811-1f5d-5d24-92673e573781
>
> ExecuteGroovyScript[id=fbad38e4-5811-1f5d-5d24-92673e573781] Field name :
> PERSON_ID
>
> 23:13:58 BST
> WARNING
> fbad38e4-5811-1f5d-5d24-92673e573781
>
> ExecuteGroovyScript[id=fbad38e4-5811-1f5d-5d24-92673e573781] Field name :
> gender
>
> 23:13:58 BST
> WARNING
> fbad38e4-5811-1f5d-5d24-92673e573781
>
> ExecuteGroovyScript[id=fbad38e4-5811-1f5d-5d24-92673e573781] Field name :
> age
>
> 23:13:58 BST
> WARNING
> fbad38e4-5811-1f5d-5d24-92673e573781
>
> ExecuteGroovyScript[id=fbad38e4-5811-1f5d-5d24-92673e573781] Got next
> record
>
> 23:13:58 BST
> WARNING
> fbad38e4-5811-1f5d-5d24-92673e573781
>
> ExecuteGroovyScript[id=fbad38e4-5811-1f5d-5d24-92673e573781] Got
> compression cod
> ##############
> For the new
> ##############
> 0:49:21 BST
> WARNING
> 219801af-0198-1000-88b2-b2d88ce64a3a
>
> ExecuteGroovyScript[id=219801af-0198-1000-88b2-b2d88ce64a3a] Field name :
> compression_cd
>
> 00:49:21 BST
> WARNING
> 219801af-0198-1000-88b2-b2d88ce64a3a
>
> ExecuteGroovyScript[id=219801af-0198-1000-88b2-b2d88ce64a3a] Field name :
> max_sequence_nbr
>
> 00:49:21 BST
> WARNING
> 219801af-0198-1000-88b2-b2d88ce64a3a
>
> ExecuteGroovyScript[id=219801af-0198-1000-88b2-b2d88ce64a3a] Field name :
> blob_contents
>
> 00:49:21 BST
> WARNING
> 219801af-0198-1000-88b2-b2d88ce64a3a
>
> ExecuteGroovyScript[id=219801af-0198-1000-88b2-b2d88ce64a3a] Got next
> record
>
> 00:49:21 BST
> ERROR
> 219801af-0198-1000-88b2-b2d88ce64a3a
>
> ExecuteGroovyScript[id=219801af-0198-1000-88b2-b2d88ce64a3a] Error
> appending new record to avro file:
> org.codehaus.groovy.runtime.typehandling.GroovyCastException: Cannot cast
> object 'null' with class 'null' to class 'int'. Try 'java.lang.Integer'
> instead
> ##############
> The strange thing is that the blob content field name isn't displayed in
> the output from the old format, yet I'm able to pass the content to the
> subsequent processing steps. Also, blob_id isn't listed in the new format
> output, but I'm able to retrieve it. compression_cd is listed, but I don't
> seem to be able to fetch it, even if I hardcode the name
>
> On Mon, Jul 21, 2025 at 12:57 AM Joe Witt <[email protected]> wrote:
>
>> Richard,
>>
>> It isn't obvious to me what you're trying to achieve vs what is currently
>> happening but I'd be interested to know what the output for this part of
>> the code is
>>
>>             def fields = SCH.getFields()
>>             if (fields.isEmpty() ) {
>>                log.warn("No fields found")
>>             } else {
>>                fields.each { field ->
>>                   def fieldName = field.name()
>>                   log.warn("Field name : ${field.name()}")
>>                }
>>             }
>>
>> Is the compression_id field there?
>>
>> Thanks
>>
>> On Sun, Jul 20, 2025 at 2:10 AM Richard Beare <[email protected]>
>> wrote:
>>
>>> And I forgot to say - I'm using nifi 1.20.0
>>>
>>> On Sun, Jul 20, 2025 at 6:56 PM Richard Beare <[email protected]>
>>> wrote:
>>>
>>>> Hi,
>>>> I have a groovy script executed from an ExecuteGroovyScript processor
>>>> that I wrote a few years ago. I copied the entire group and reconfigured
>>>> the input to point at a new source database (postgres rather than mssql).
>>>> The data is flowing through to the groovy processor OK, but the structure
>>>> of records is a little different (mostly case in the field names, with a
>>>> few different names). Most of the names were configured via properties to
>>>> allow easy modification via the interface.
>>>>
>>>> The original workflow seems to work just fine. The new one is having
>>>> problems retrieving fields, and I can't figure out why.
>>>>
>>>> The error I get is when retrieving the compression code, the field name
>>>> of which is passed as a property.
>>>> I'm able to explicitly retrieve the blob_id
>>>> The field printing I've included is only printing the last 3 fields for
>>>> for the old and new data.
>>>>
>>>> What am I missing here. Apologies if this is something obvious, I'm a
>>>> bit out of nifi practice.
>>>>
>>>> The old input records were in this form:
>>>> [ {
>>>>   "blobid" : 72001,
>>>>   "EVENT_ID" : 7.91947467E8,
>>>>   "VALID_FROM_DT_TM" : "2020-03-10T02:16:34Z",
>>>>   "VALID_UNTIL_DT_TM" : "2100-12-31T00:00:00Z",
>>>>   "BLOB_SEQ_NUM" : 1.0,
>>>>   "BLOB_LENGTH" : 976.0,
>>>>   "COMPRESSION_CD" : 728.0,
>>>>   "UPDT_DT_TM" : "2020-03-10T02:16:34Z",
>>>>   "BLOB_CONTENTS" : "=—\u000 -- binary stuff",
>>>>   "blob_parts" : 1.0,
>>>>   "ENCNTR_ID" : 37184618,
>>>>   "PERSON_ID" : 9114238,
>>>>   "gender" : 2,
>>>>   "age" : 92
>>>> }, {
>>>>   "blobid" : 72002,
>>>>   "EVENT_ID" : 7.91948699E8,
>>>>   "VALID_FROM_DT_TM" : "2020-03-07T11:11:33Z",
>>>>   "VALID_UNTIL_DT_TM" : "2100-12-31T00:00:00Z",
>>>>   "BLOB_SEQ_NUM" : 1.0,
>>>>   "BLOB_LENGTH" : 2304.0,
>>>>   "COMPRESSION_CD" : 728.0,
>>>>
>>>> ...
>>>> ]
>>>>
>>>> The new ones look like:
>>>>
>>>> [ {
>>>>   "blob_id" : 1001,
>>>>   "event_id" : 3188115,
>>>>   "person_id" : 8430038,
>>>>   "encntr_id" : 13352660,
>>>>   "valid_from_dt_tm" : "2011-05-19T00:39:51Z",
>>>>   "creation_dt_tm" : "2011-05-19T00:39:51Z",
>>>>   "blob_seq_num" : 1,
>>>>   "blob_length" : 2252,
>>>>   "compression_cd" : 728,
>>>>   "max_sequence_nbr" : 1,
>>>>   "blob_contents" : "\u00 - binary stuff"
>>>> }, {
>>>>   "blob_id" : 1002,
>>>>   "event_id" : 3188119,
>>>>   "person_id" : 7241448,
>>>>   "encntr_id" : 11645097,
>>>>   "valid_from_dt_tm" : "2011-05-19T00:39:51Z",
>>>>   "creation_dt_tm" : "2011-05-19T00:39:51Z",
>>>>
>>>> So there is some formatting difference, and field ordering difference.
>>>> The script only uses compression_cd and blob_contents
>>>>
>>>> @Grab('org.apache.avro:avro:1.8.1')
>>>> import org.apache.avro.*
>>>> import org.apache.avro.file.*
>>>> import org.apache.avro.generic.*
>>>> import java.nio.ByteBuffer
>>>> import DecompressOcf.DecompressBlob
>>>>
>>>> // from
>>>> https://github.com/maxbback/avro_reader_writer/blob/master/avroProcessor.groovy
>>>>
>>>>
>>>> // needs docfield and compressfield in the host processor
>>>>
>>>> def flowFile = session.get()
>>>> if(!flowFile) return
>>>>
>>>> try {
>>>>
>>>>     flowFile = session.write(flowFile, {inStream, outStream ->
>>>>         // Defining avro reader and writer
>>>>         DataFileStream<GenericRecord> reader = new
>>>> DataFileStream<>(inStream, new GenericDatumReader<GenericRecord>())
>>>>         DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new
>>>> GenericDatumWriter<GenericRecord>())
>>>>
>>>>         // get avro schema
>>>>         def schema = reader.schema
>>>>         // in my case I am processing a address lookup table data with
>>>> only one field the address field
>>>>         //
>>>> {"type":"record","name":"lookuptable","namespace":"any.data","fields":[{"name":"address","type":["null","string"]}]}
>>>>
>>>>         // Define which schema to be used for writing
>>>>         // If you want to extend or change the output record format
>>>>         // you define a new schema and specify that it shall be used
>>>> for writing
>>>>         writer.create(schema, outStream)
>>>>         log.warn(String.format("Before while loop"))
>>>>         // process record by record
>>>>         while (reader.hasNext()) {
>>>>             log.warn(String.format("Start of while loop: %s",
>>>> compressfield))
>>>>             GenericRecord currRecord = reader.next()
>>>>             def SCH = currRecord.getSchema()
>>>>
>>>>             def fields = SCH.getFields()
>>>>             if (fields.isEmpty() ) {
>>>>                log.warn("No fields found")
>>>>             } else {
>>>>                fields.each { field ->
>>>>                   def fieldName = field.name()
>>>>                   log.warn("Field name : ${field.name()}")
>>>>                }
>>>>             }
>>>>             log.warn("Got next record")
>>>>             //int I = currRecord.get("event_id")
>>>>             //log.warn(String.format(" blob_id direct %d ", I))
>>>>
>>>>             int CompressionCode = currRecord.get(compressfield as
>>>> String)
>>>>             log.warn(String.format("Got compression code"))
>>>>             ByteBuffer sblob = currRecord.get(docfield as String)
>>>>             if (CompressionCode == 728) {
>>>>                // action normally here
>>>>             } else if (CompressionCode == 727) {
>>>>                 // this blob isn't compressed - strip the suffix
>>>>               // action without decompression.
>>>>             } else {
>>>>                 log.error('Unknown compression code')
>>>>             }
>>>>             writer.append(currRecord)
>>>>         }
>>>>         // Create a new record
>>>>         //   GenericRecord newRecord = new GenericData.Record(schema)
>>>>         // populate the record with data
>>>>         //   newRecord.put("address", new org.apache.avro.util.Utf8("My
>>>> street"))
>>>>         // Append a new record to avro file
>>>>         //   writer.append(newRecord)
>>>>         //writer.appendAllFrom(reader, false)
>>>>         // do not forget to close the writer
>>>>         writer.close()
>>>>
>>>>     } as StreamCallback)
>>>>
>>>>     session.transfer(flowFile, REL_SUCCESS)
>>>> } catch(e) {
>>>>     log.error('Error appending new record to avro file', e)
>>>>     flowFile = session.penalize(flowFile)
>>>>     session.transfer(flowFile, REL_FAILURE)
>>>> }
>>>>
>>>

Reply via email to