I am having great difficulty getting a stream callback in python code to simply add a new metadata attribute property with value to a flowfile. I execute my code within an ExecuteScript processor. My incoming flowfile has a number of complex attributes that contain embedded data of high interest to my users. An attribute entitled REMARKS is one such example, containing critical information that I parse out using regex in python. While simple cases may allow me to use UpdateAttribute to add and modify attributes, I find that they do not allow me to perform all the complex regex I anticipate for my requirements.
My code below successfully parses the existing attribute, but it saves the value(s) as a new data payload of my flowfile – not as a new attribute. I must save the parsed result as a new attribute in the flowfile, and must leave my flowfile data payload unchanged. How can I do this? This seems like such a fundamental feature of common interest, and so I have to believe I am missing the obvious. (My code below was developed based on an example originally offered by Matt B. I want to give him credit for his examples and thank him for getting me started). Please pardon single quote characters formatted improperly in Word by my code retyping efforts below. import json import re import java.io import csv from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import StreamCallback result = {} # define a dictionary def isNotEmpty(s): return bool(s and s.strip()); def parseEmbeddedColor(s): pattern = re.compile(r”””.*COLOR\= (?P<m1>.*?) \/\/ .*”””,re.IGNORECASE|re.DOTALL|re.VERBOSE) match = pattern.match(s) if match is None: return ‘’ thisMatch = match.group(“m1”) if thisMatch: return thisMatch else: return ‘’ class PyStreamCallback(StreamCallback) : def __init__(self): pass def process(self, inputStream, outputStream): if isNotEmpty(flowFile.getAttribute(‘message.REMARKS’)): incoming_metadata_comment = flowFile.getAttribute(‘message.REMARKS’) else: incoming_metadata_comment = ‘’ origColor = parseEmbeddedColor(incoming_metadata_content) if isNotEmpty(origColor): result[‘origColor’] = origColor else: result[‘origColor’] = ‘’ outputStream.write(Unicode(json.dumps(result[‘origColor’]))) flowFile = session.get() if (flowFile != None) : # the following line is all I can get to work currently. It is not what I need. It replaces the flowFile data payload… flowFile = session.write(flowFile, PyStreamCallback()) # I made an attempt to add the new parsed color value as an attribute here, but failed… # flowFile = session.putAttribute(flowFile,”parsedColor”,PyStreamCallback()) session.transfer(flowFile,REL_SUCCESS)