Hi Sally,

I guess your three questions relate to each other, right?

So, you get from one incoming queue FlowFiles with an Attribute “count” which 
tells you, how much more FlowFiles in the same queue you have to read and 
process together, right?

NIFI is highly parallelised and all components work as independently as 
possible. Therefore your FlowFiles are evenly spread over all threads of one 
processor and even for one thread you can not say, which FlowFile will be next 
in your queue as they’re are taken by “fastest one first”. Especially you 
cannot say, that the >first< FlowFile contains some information about the 
>next< FlowFiles as FlowFiles are not in a special order in the queue.

But what can you try?

I’d suggest to try with one or more standard processors available. My best 
guess is MergeContent to put FlowFiles together and then take a second 
Processor (maybe some scripted one to do the processing). When doing your tests 
with MergeContent, make sure, that you have only one thread running, as this 
helps to understand the behaviour better: In “configure” of MergeContent, on 
the “Scheduling” Tab, set “Concurrent Tasks” to 1. If you’re in a NIFI-cluster 
environment also set it to run on “Primary Node”.

The MergeContent processor can handle incoming FlowFiles and bucket them 
together based on attributes in the FlowFiles.

One little thing about MergeContent that confuses a lot of people but has a 
simple work around: Make sure, it has only one incoming queue. This is also 
true, if you want to redirect failure back into MergeContent. In such a case 
put a Funnel in front of MergeContent and connect all incoming queues 
(including failure) into the Funnel. It then works as expected and documented.

cheers, Jboi

> On 28. Oct 2017, at 21:52, sally <sally.tkhilaishv...@gmail.com> wrote:
> 
> I have several flowfiles in sucess queue in which i have parameter count and
> i have to transfer only counter amount flowfiles , every time counter value
> is different so any time i will have to get it from flowfile( all flowfile
> from this success queue have this attribute), i have tried this logic below
> ( i mean my code) and several modifications of it but i can't fulfill this
> task, could you tell me what's wrong with this code?
> 
> import org.apache.commons.io.IOUtils
> import java.nio.charset.StandardCharsets
> import groovy.lang.*
> 
> def flowFile= session.get();
> if (!flowFile) return;
> int numb=(flowFile.getAttribute("count") as Double).round()
> def filename=flowFile.getAttribute("filename")
> 
> 
> def flowfileList = session.get(numb-1)
> 
> if(flowfileList.size() == numb-1) {
> session.transfer(flowfile,REL_SUCCESS);
> session.transfer(flowfileList,REL_SUCCESS);
> 
> }
> else{
> session.rollback()
> 
> }
> 
> 
> 
> --
> Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/

Reply via email to