And I forgot to say - I'm using nifi 1.20.0
On Sun, Jul 20, 2025 at 6:56 PM Richard Beare <[email protected]>
wrote:
> Hi,
> I have a groovy script executed from an ExecuteGroovyScript processor that
> I wrote a few years ago. I copied the entire group and reconfigured the
> input to point at a new source database (postgres rather than mssql). The
> data is flowing through to the groovy processor OK, but the structure of
> records is a little different (mostly case in the field names, with a few
> different names). Most of the names were configured via properties to allow
> easy modification via the interface.
>
> The original workflow seems to work just fine. The new one is having
> problems retrieving fields, and I can't figure out why.
>
> The error I get is when retrieving the compression code, the field name of
> which is passed as a property.
> I'm able to explicitly retrieve the blob_id
> The field printing I've included is only printing the last 3 fields for
> for the old and new data.
>
> What am I missing here. Apologies if this is something obvious, I'm a bit
> out of nifi practice.
>
> The old input records were in this form:
> [ {
> "blobid" : 72001,
> "EVENT_ID" : 7.91947467E8,
> "VALID_FROM_DT_TM" : "2020-03-10T02:16:34Z",
> "VALID_UNTIL_DT_TM" : "2100-12-31T00:00:00Z",
> "BLOB_SEQ_NUM" : 1.0,
> "BLOB_LENGTH" : 976.0,
> "COMPRESSION_CD" : 728.0,
> "UPDT_DT_TM" : "2020-03-10T02:16:34Z",
> "BLOB_CONTENTS" : "=—\u000 -- binary stuff",
> "blob_parts" : 1.0,
> "ENCNTR_ID" : 37184618,
> "PERSON_ID" : 9114238,
> "gender" : 2,
> "age" : 92
> }, {
> "blobid" : 72002,
> "EVENT_ID" : 7.91948699E8,
> "VALID_FROM_DT_TM" : "2020-03-07T11:11:33Z",
> "VALID_UNTIL_DT_TM" : "2100-12-31T00:00:00Z",
> "BLOB_SEQ_NUM" : 1.0,
> "BLOB_LENGTH" : 2304.0,
> "COMPRESSION_CD" : 728.0,
>
> ...
> ]
>
> The new ones look like:
>
> [ {
> "blob_id" : 1001,
> "event_id" : 3188115,
> "person_id" : 8430038,
> "encntr_id" : 13352660,
> "valid_from_dt_tm" : "2011-05-19T00:39:51Z",
> "creation_dt_tm" : "2011-05-19T00:39:51Z",
> "blob_seq_num" : 1,
> "blob_length" : 2252,
> "compression_cd" : 728,
> "max_sequence_nbr" : 1,
> "blob_contents" : "\u00 - binary stuff"
> }, {
> "blob_id" : 1002,
> "event_id" : 3188119,
> "person_id" : 7241448,
> "encntr_id" : 11645097,
> "valid_from_dt_tm" : "2011-05-19T00:39:51Z",
> "creation_dt_tm" : "2011-05-19T00:39:51Z",
>
> So there is some formatting difference, and field ordering difference.
> The script only uses compression_cd and blob_contents
>
> @Grab('org.apache.avro:avro:1.8.1')
> import org.apache.avro.*
> import org.apache.avro.file.*
> import org.apache.avro.generic.*
> import java.nio.ByteBuffer
> import DecompressOcf.DecompressBlob
>
> // from
> https://github.com/maxbback/avro_reader_writer/blob/master/avroProcessor.groovy
>
>
> // needs docfield and compressfield in the host processor
>
> def flowFile = session.get()
> if(!flowFile) return
>
> try {
>
> flowFile = session.write(flowFile, {inStream, outStream ->
> // Defining avro reader and writer
> DataFileStream<GenericRecord> reader = new
> DataFileStream<>(inStream, new GenericDatumReader<GenericRecord>())
> DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new
> GenericDatumWriter<GenericRecord>())
>
> // get avro schema
> def schema = reader.schema
> // in my case I am processing a address lookup table data with
> only one field the address field
> //
> {"type":"record","name":"lookuptable","namespace":"any.data","fields":[{"name":"address","type":["null","string"]}]}
>
> // Define which schema to be used for writing
> // If you want to extend or change the output record format
> // you define a new schema and specify that it shall be used for
> writing
> writer.create(schema, outStream)
> log.warn(String.format("Before while loop"))
> // process record by record
> while (reader.hasNext()) {
> log.warn(String.format("Start of while loop: %s",
> compressfield))
> GenericRecord currRecord = reader.next()
> def SCH = currRecord.getSchema()
>
> def fields = SCH.getFields()
> if (fields.isEmpty() ) {
> log.warn("No fields found")
> } else {
> fields.each { field ->
> def fieldName = field.name()
> log.warn("Field name : ${field.name()}")
> }
> }
> log.warn("Got next record")
> //int I = currRecord.get("event_id")
> //log.warn(String.format(" blob_id direct %d ", I))
>
> int CompressionCode = currRecord.get(compressfield as String)
> log.warn(String.format("Got compression code"))
> ByteBuffer sblob = currRecord.get(docfield as String)
> if (CompressionCode == 728) {
> // action normally here
> } else if (CompressionCode == 727) {
> // this blob isn't compressed - strip the suffix
> // action without decompression.
> } else {
> log.error('Unknown compression code')
> }
> writer.append(currRecord)
> }
> // Create a new record
> // GenericRecord newRecord = new GenericData.Record(schema)
> // populate the record with data
> // newRecord.put("address", new org.apache.avro.util.Utf8("My
> street"))
> // Append a new record to avro file
> // writer.append(newRecord)
> //writer.appendAllFrom(reader, false)
> // do not forget to close the writer
> writer.close()
>
> } as StreamCallback)
>
> session.transfer(flowFile, REL_SUCCESS)
> } catch(e) {
> log.error('Error appending new record to avro file', e)
> flowFile = session.penalize(flowFile)
> session.transfer(flowFile, REL_FAILURE)
> }
>