Vivek,

To add to what Bryan said, if you send a batch to MarkLogic and there is a
failure the standard pattern would be to send all of the flowfiles that
were part of the batch to a failure relationship. Depending on the error,
you might want to also support a "retry" relationship for situations like
timeouts, network connectivity issues, etc. that don't indicate MarkLogic
tried and failed to accept the documents. This would be true of both types
of Put processor, be it a record-based one or not.

Earlier I suggested that you build two separate processors, and I still
think that's the right move for you here. It would help you fully support
NiFi's record reader capabilities and also help you break out your use
cases into smaller, more focused processors.

On Tue, May 1, 2018 at 4:14 PM Bryan Bende <[email protected]> wrote:

> Hello,
>
> I think to store off the flow files you would also need to store the
> session it came from, but I would probably question whether this is
> really the best idea...
>
> What type of data are you expecting to come into your processor?
>
> 1) If you can leverage the record reader concept in NiFi this would be
> the best case...
>
> In order to leverage it then your expected data would need to fit into
> one of the available record readers (json, csv, avro, etc), or if you
> have a custom data format then you could implement a record reader for
> your format.
>
> Your PutMarkLogic processor would have a record reader service which
> it would use to read records from an incoming flow file and would
> convert each record to whatever object you need to send to MarkLogic.
>
> This leaves the batching up to the dataflow designer... if someone
> sends in a flow file with 1 record then you send 1 record... if
> someone sends in a flow file with 1k records, then you send 1k
> records.
>
> Here is an example of PutElasticSearchRecord:
>
>
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java#L316-L317
>
>
> 2) You could call session.get(batchSize) to obtain a batch of flow
> files and send this batch as one operation.
>
> The downside is that the batchSize here is a maximum which will only
> be reached if the queue already has more than the batch size when
> session.get is called, otherwise it could only get 1 flow file if that
> is what is available.
>
>
> Let us know if this doesn't make sense.
>
> -Bryan
>
>
>
> On Tue, May 1, 2018 at 3:20 PM, Vivek Muniyandi
> <[email protected]> wrote:
> > Hi, Esteemed NiFi Developers:
> >
> > We are creating processors for ingesting and exporting out of MarkLogic
> – See Jira ticket<https://issues.apache.org/jira/browse/NIFI-5102>. We
> are new to NiFi community and we have been catching up on the code.
> >
> >
> > Can we accumulate the FlowFiles we get in batches and transfer the
> FlowFiles to the relationship sometime at a later point in time? We use a
> utility we built to write data to MarkLogic in batches. We won’t know if
> the batch succeeded or failed until the batch (of FlowFiles) is full and
> sent to MarkLogic for processing. I am not able to defer the transfer of
> flow file to a relationship to a later point in time. I am getting an
> exception saying that the flow file is not transferred to a relationship or
> Flowfile is not known to the ProcessSession. Is onTrigger method
> synchronous? i.e. we need to determine the relationship for each FlowFile
> in that method itself or can it be done sometime asynchronously? If it can
> be done, then do we need to store the mapping for the ProcessSession and
> the FlowFile to transfer?
> >
> > Thanks in advance!
> >
> > Regards,
> > Vivek
>

Reply via email to