hi Mark, Thanks for the pointer.
My end goal is to write batches of records into a columnar datastore [1]. I had seen the RecordTransform in the Python Developer Guide, but I wasn't sure if performance would be an issue because I was thinking I would need to query the schema. Looking at the PutChroma example, I think I'll try a similar approach and make it a requirement that incoming data is JSON and convert that directly to pyarrow for saving into Vast DB. Best regards, Chris [1] https://vastdb-sdk.readthedocs.io/en/v1.1.0/ [2] https://github.com/apache/nifi-python-extensions/blob/main/src/extensions/vectorstores/PutChroma.py On Sat, 24 Aug 2024 at 20:04, Mark Payne <marka...@hotmail.com> wrote: > Hey Chris, > > If you haven’t already, I’d recommend taking a look at the Python > Developer’s Guide [1], particularly the section on RecordTransforms. > In short, you should not need a Record Reader in Python. Instead of > FlowFileTransform, you would extend from RecordTransform. > Then, your transform method doesn’t receive a FlowFile but rather it > receives a Record (in the form of a Python dict). > > Thanks > -Mark > > > > [1] > https://github.com/apache/nifi/blob/main/nifi-docs/src/main/asciidoc/python-developer-guide.adoc#recordtransform > > > On Aug 24, 2024, at 11:13 AM, chris snow <chsnow...@gmail.com> wrote: > > It seems I would need access to the ProcessSession but from what I can > understand from Java processors that is passed in via the on_trigger method > which doesn't appear to have been implemented for Python processors? > > On Sat, 24 Aug 2024 at 08:22, chris snow <chsnow...@gmail.com> wrote: > >> I have a python component that users a controller service. I can't >> figure out from the java api docs [1] and python api source code [2] how to >> retrieve the record reader. >> >> Any suggestions? >> >> from nifiapi.properties import PropertyDescriptor, PythonPropertyValue >> from nifiapi.flowfiletransform import FlowFileTransform, >> FlowFileTransformResult >> >> class PythonReaderServiceExample(FlowFileTransform): >> class Java: >> implements = ['org.apache.nifi.python.processor.FlowFileTransform'] >> >> class ProcessorDetails: >> dependencies = [] # ['debugpy'] >> version = '0.0.1-SNAPSHOT' >> >> def __init__(self, **kwargs): >> self.RECORD_READER = PropertyDescriptor( >> name="Record Reader", >> description="Controller Service to use for parsing incoming data into >> Records.", >> required=True, >> controller_service_definition= >> "org.apache.nifi.serialization.RecordReaderFactory") >> >> self.descriptors = [self.RECORD_READER] >> >> >> def getPropertyDescriptors(self): >> return self.descriptors >> def transform(self, context, flowFile): >> controllerService = context.getProperty("Record Reader" >> ).asControllerService() >> >> ########################################################################## >> # recordReader = controllerService.??? # How to retrieve the RecordReader? >> ########################################################################## >> >> return FlowFileTransformResult(relationship = "success") >> >> Thanks! >> >> [1] >> https://www.javadoc.io/doc/org.apache.nifi/nifi-record-serialization-service-api/latest/org/apache/nifi/serialization/RecordReaderFactory.html >> [2] >> https://github.com/apache/nifi/blob/rel/nifi-2.0.0-M4/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/properties.py >> > >