Hey Chris,

If you haven’t already, I’d recommend taking a look at the Python Developer’s 
Guide [1], particularly the section on RecordTransforms.
In short, you should not need a Record Reader in Python. Instead of 
FlowFileTransform, you would extend from RecordTransform.
Then, your transform method doesn’t receive a FlowFile but rather it receives a 
Record (in the form of a Python dict).

Thanks
-Mark



[1] 
https://github.com/apache/nifi/blob/main/nifi-docs/src/main/asciidoc/python-developer-guide.adoc#recordtransform


On Aug 24, 2024, at 11:13 AM, chris snow <chsnow...@gmail.com> wrote:

It seems I would need access to the ProcessSession but from what I can 
understand from Java processors that is passed in via the on_trigger method 
which doesn't appear to have been implemented for Python processors?

On Sat, 24 Aug 2024 at 08:22, chris snow 
<chsnow...@gmail.com<mailto:chsnow...@gmail.com>> wrote:
I have a python component that users a controller service.  I can't figure out 
from the java api docs [1] and python api source code [2] how to retrieve the 
record reader.

Any suggestions?

from nifiapi.properties import PropertyDescriptor, PythonPropertyValue
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult

class PythonReaderServiceExample(FlowFileTransform):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']

class ProcessorDetails:
dependencies = [] # ['debugpy']
version = '0.0.1-SNAPSHOT'

def __init__(self, **kwargs):
self.RECORD_READER = PropertyDescriptor(
name="Record Reader",
description="Controller Service to use for parsing incoming data into Records.",
required=True,
controller_service_definition="org.apache.nifi.serialization.RecordReaderFactory")

self.descriptors = [self.RECORD_READER]


def getPropertyDescriptors(self):
return self.descriptors
def transform(self, context, flowFile):
controllerService = context.getProperty("Record Reader").asControllerService()

##########################################################################
# recordReader = controllerService.??? # How to retrieve the RecordReader?
##########################################################################

return FlowFileTransformResult(relationship = "success")

Thanks!

[1] 
https://www.javadoc.io/doc/org.apache.nifi/nifi-record-serialization-service-api/latest/org/apache/nifi/serialization/RecordReaderFactory.html
[2] 
https://github.com/apache/nifi/blob/rel/nifi-2.0.0-M4/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/properties.py

Reply via email to