hi Mark,

Thanks for the pointer.

My end goal is to write batches of records into a columnar datastore [1].
I had seen the RecordTransform in the Python Developer Guide, but I wasn't
sure if performance would be an issue because I was thinking I would need
to query the schema.

Looking at the PutChroma example, I think I'll try a similar approach and
make it a requirement that incoming data is JSON and convert that directly
to pyarrow for saving into Vast DB.

Best regards,

Chris

[1] https://vastdb-sdk.readthedocs.io/en/v1.1.0/
[2]
https://github.com/apache/nifi-python-extensions/blob/main/src/extensions/vectorstores/PutChroma.py



On Sat, 24 Aug 2024 at 20:04, Mark Payne <marka...@hotmail.com> wrote:

> Hey Chris,
>
> If you haven’t already, I’d recommend taking a look at the Python
> Developer’s Guide [1], particularly the section on RecordTransforms.
> In short, you should not need a Record Reader in Python. Instead of
> FlowFileTransform, you would extend from RecordTransform.
> Then, your transform method doesn’t receive a FlowFile but rather it
> receives a Record (in the form of a Python dict).
>
> Thanks
> -Mark
>
>
>
> [1]
> https://github.com/apache/nifi/blob/main/nifi-docs/src/main/asciidoc/python-developer-guide.adoc#recordtransform
>
>
> On Aug 24, 2024, at 11:13 AM, chris snow <chsnow...@gmail.com> wrote:
>
> It seems I would need access to the ProcessSession but from what I can
> understand from Java processors that is passed in via the on_trigger method
> which doesn't appear to have been implemented for Python processors?
>
> On Sat, 24 Aug 2024 at 08:22, chris snow <chsnow...@gmail.com> wrote:
>
>> I have a python component that users a controller service.  I can't
>> figure out from the java api docs [1] and python api source code [2] how to
>> retrieve the record reader.
>>
>> Any suggestions?
>>
>> from nifiapi.properties import PropertyDescriptor, PythonPropertyValue
>> from nifiapi.flowfiletransform import FlowFileTransform,
>> FlowFileTransformResult
>>
>> class PythonReaderServiceExample(FlowFileTransform):
>> class Java:
>> implements = ['org.apache.nifi.python.processor.FlowFileTransform']
>>
>> class ProcessorDetails:
>> dependencies = [] # ['debugpy']
>> version = '0.0.1-SNAPSHOT'
>>
>> def __init__(self, **kwargs):
>> self.RECORD_READER = PropertyDescriptor(
>> name="Record Reader",
>> description="Controller Service to use for parsing incoming data into
>> Records.",
>> required=True,
>> controller_service_definition=
>> "org.apache.nifi.serialization.RecordReaderFactory")
>>
>> self.descriptors = [self.RECORD_READER]
>>
>>
>> def getPropertyDescriptors(self):
>> return self.descriptors
>> def transform(self, context, flowFile):
>> controllerService = context.getProperty("Record Reader"
>> ).asControllerService()
>>
>> ##########################################################################
>> # recordReader = controllerService.??? # How to retrieve the RecordReader?
>> ##########################################################################
>>
>> return FlowFileTransformResult(relationship = "success")
>>
>> Thanks!
>>
>> [1]
>> https://www.javadoc.io/doc/org.apache.nifi/nifi-record-serialization-service-api/latest/org/apache/nifi/serialization/RecordReaderFactory.html
>> [2]
>> https://github.com/apache/nifi/blob/rel/nifi-2.0.0-M4/nifi-extension-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/properties.py
>>
>
>

Reply via email to