And I forgot to say - I'm using nifi 1.20.0

On Sun, Jul 20, 2025 at 6:56 PM Richard Beare <[email protected]>
wrote:

> Hi,
> I have a groovy script executed from an ExecuteGroovyScript processor that
> I wrote a few years ago. I copied the entire group and reconfigured the
> input to point at a new source database (postgres rather than mssql). The
> data is flowing through to the groovy processor OK, but the structure of
> records is a little different (mostly case in the field names, with a few
> different names). Most of the names were configured via properties to allow
> easy modification via the interface.
>
> The original workflow seems to work just fine. The new one is having
> problems retrieving fields, and I can't figure out why.
>
> The error I get is when retrieving the compression code, the field name of
> which is passed as a property.
> I'm able to explicitly retrieve the blob_id
> The field printing I've included is only printing the last 3 fields for
> for the old and new data.
>
> What am I missing here. Apologies if this is something obvious, I'm a bit
> out of nifi practice.
>
> The old input records were in this form:
> [ {
>   "blobid" : 72001,
>   "EVENT_ID" : 7.91947467E8,
>   "VALID_FROM_DT_TM" : "2020-03-10T02:16:34Z",
>   "VALID_UNTIL_DT_TM" : "2100-12-31T00:00:00Z",
>   "BLOB_SEQ_NUM" : 1.0,
>   "BLOB_LENGTH" : 976.0,
>   "COMPRESSION_CD" : 728.0,
>   "UPDT_DT_TM" : "2020-03-10T02:16:34Z",
>   "BLOB_CONTENTS" : "=—\u000 -- binary stuff",
>   "blob_parts" : 1.0,
>   "ENCNTR_ID" : 37184618,
>   "PERSON_ID" : 9114238,
>   "gender" : 2,
>   "age" : 92
> }, {
>   "blobid" : 72002,
>   "EVENT_ID" : 7.91948699E8,
>   "VALID_FROM_DT_TM" : "2020-03-07T11:11:33Z",
>   "VALID_UNTIL_DT_TM" : "2100-12-31T00:00:00Z",
>   "BLOB_SEQ_NUM" : 1.0,
>   "BLOB_LENGTH" : 2304.0,
>   "COMPRESSION_CD" : 728.0,
>
> ...
> ]
>
> The new ones look like:
>
> [ {
>   "blob_id" : 1001,
>   "event_id" : 3188115,
>   "person_id" : 8430038,
>   "encntr_id" : 13352660,
>   "valid_from_dt_tm" : "2011-05-19T00:39:51Z",
>   "creation_dt_tm" : "2011-05-19T00:39:51Z",
>   "blob_seq_num" : 1,
>   "blob_length" : 2252,
>   "compression_cd" : 728,
>   "max_sequence_nbr" : 1,
>   "blob_contents" : "\u00 - binary stuff"
> }, {
>   "blob_id" : 1002,
>   "event_id" : 3188119,
>   "person_id" : 7241448,
>   "encntr_id" : 11645097,
>   "valid_from_dt_tm" : "2011-05-19T00:39:51Z",
>   "creation_dt_tm" : "2011-05-19T00:39:51Z",
>
> So there is some formatting difference, and field ordering difference.
> The script only uses compression_cd and blob_contents
>
> @Grab('org.apache.avro:avro:1.8.1')
> import org.apache.avro.*
> import org.apache.avro.file.*
> import org.apache.avro.generic.*
> import java.nio.ByteBuffer
> import DecompressOcf.DecompressBlob
>
> // from
> https://github.com/maxbback/avro_reader_writer/blob/master/avroProcessor.groovy
>
>
> // needs docfield and compressfield in the host processor
>
> def flowFile = session.get()
> if(!flowFile) return
>
> try {
>
>     flowFile = session.write(flowFile, {inStream, outStream ->
>         // Defining avro reader and writer
>         DataFileStream<GenericRecord> reader = new
> DataFileStream<>(inStream, new GenericDatumReader<GenericRecord>())
>         DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new
> GenericDatumWriter<GenericRecord>())
>
>         // get avro schema
>         def schema = reader.schema
>         // in my case I am processing a address lookup table data with
> only one field the address field
>         //
> {"type":"record","name":"lookuptable","namespace":"any.data","fields":[{"name":"address","type":["null","string"]}]}
>
>         // Define which schema to be used for writing
>         // If you want to extend or change the output record format
>         // you define a new schema and specify that it shall be used for
> writing
>         writer.create(schema, outStream)
>         log.warn(String.format("Before while loop"))
>         // process record by record
>         while (reader.hasNext()) {
>             log.warn(String.format("Start of while loop: %s",
> compressfield))
>             GenericRecord currRecord = reader.next()
>             def SCH = currRecord.getSchema()
>
>             def fields = SCH.getFields()
>             if (fields.isEmpty() ) {
>                log.warn("No fields found")
>             } else {
>                fields.each { field ->
>                   def fieldName = field.name()
>                   log.warn("Field name : ${field.name()}")
>                }
>             }
>             log.warn("Got next record")
>             //int I = currRecord.get("event_id")
>             //log.warn(String.format(" blob_id direct %d ", I))
>
>             int CompressionCode = currRecord.get(compressfield as String)
>             log.warn(String.format("Got compression code"))
>             ByteBuffer sblob = currRecord.get(docfield as String)
>             if (CompressionCode == 728) {
>                // action normally here
>             } else if (CompressionCode == 727) {
>                 // this blob isn't compressed - strip the suffix
>               // action without decompression.
>             } else {
>                 log.error('Unknown compression code')
>             }
>             writer.append(currRecord)
>         }
>         // Create a new record
>         //   GenericRecord newRecord = new GenericData.Record(schema)
>         // populate the record with data
>         //   newRecord.put("address", new org.apache.avro.util.Utf8("My
> street"))
>         // Append a new record to avro file
>         //   writer.append(newRecord)
>         //writer.appendAllFrom(reader, false)
>         // do not forget to close the writer
>         writer.close()
>
>     } as StreamCallback)
>
>     session.transfer(flowFile, REL_SUCCESS)
> } catch(e) {
>     log.error('Error appending new record to avro file', e)
>     flowFile = session.penalize(flowFile)
>     session.transfer(flowFile, REL_FAILURE)
> }
>

Reply via email to