I am having great difficulty getting a stream callback in python code to
simply add a new metadata attribute property with value to a flowfile. I
execute my code within an ExecuteScript processor. My incoming flowfile has
a number of complex attributes that contain embedded data of high interest
to my users. An attribute entitled REMARKS is one such example, containing
critical information that I parse out using regex in python. While simple
cases may allow me to use UpdateAttribute to add and modify attributes, I
find that they do not allow me to perform all the complex regex I
anticipate for my requirements.



My code below successfully parses the existing attribute, but it saves the
value(s) as a new data payload of my flowfile – not as a new attribute. I
must save the parsed result as a new attribute in the flowfile, and must
leave my flowfile data payload unchanged. How can I do this? This seems
like such a fundamental feature of common interest, and so I have to
believe I am missing the obvious.



(My code below was developed based on an example originally offered by Matt
B. I want to give him credit for his examples and thank him for getting me
started).



Please pardon single quote characters formatted improperly in Word by my
code retyping efforts below.



import json

import re

import java.io

import csv

from org.apache.commons.io import IOUtils

from java.nio.charset import StandardCharsets

from org.apache.nifi.processor.io import StreamCallback



result = {} # define a dictionary

def isNotEmpty(s):

              return bool(s and s.strip());



def parseEmbeddedColor(s):

              pattern = re.compile(r”””.*COLOR\=

                                           (?P<m1>.*?)

                                           \/\/


.*”””,re.IGNORECASE|re.DOTALL|re.VERBOSE)

match = pattern.match(s)

if match is None:

              return ‘’

thisMatch = match.group(“m1”)

if thisMatch: return thisMatch

else: return ‘’



class PyStreamCallback(StreamCallback) :

              def __init__(self):

                             pass

              def process(self, inputStream, outputStream):

                             if
isNotEmpty(flowFile.getAttribute(‘message.REMARKS’)):

                                           incoming_metadata_comment =
flowFile.getAttribute(‘message.REMARKS’)

                             else:

                                           incoming_metadata_comment = ‘’

                             origColor =
parseEmbeddedColor(incoming_metadata_content)

                             if isNotEmpty(origColor):

                                           result[‘origColor’] = origColor

                             else:

                                           result[‘origColor’] = ‘’


outputStream.write(Unicode(json.dumps(result[‘origColor’])))



flowFile = session.get()

if (flowFile != None) :

              # the following line is all I can get to work currently. It
is not what I need. It replaces the flowFile data payload…

              flowFile = session.write(flowFile, PyStreamCallback())



              # I made an attempt to add the new parsed color value as an
attribute here, but failed…

              # flowFile =
session.putAttribute(flowFile,”parsedColor”,PyStreamCallback())



              session.transfer(flowFile,REL_SUCCESS)

Reply via email to