Hi Bas,

Sorry for the late reply.

Thanks for the clarification, I over simplified the flow. As you
experienced, NiFi back pressure is handled per relationship and as
long as a relationship has room to receive new flow files, source
processor is scheduled to run.

I don't think there's an existing solution to block the source
processor as you desired.

However, I found a possible improvement to achieve that.
NiFi scheduler checks downstream relationship availability, when it's
full, the processor won't be scheduled to run.
In case a source processor has multiple outgoing relationships, and if
ANY of those is full, the processor won't be scheduled.

(This is how processor scheduling works with back-pressure, but can
alter with @TriggerWhenAnyDestinationAvailable annotation.
DistributeLoad is the only processor annotated with this)

So, I think we could use this mechanism to keep the source processor
waiting to be scheduled, by following flow:

GetSQS
  -- success --> FetchS3Object --> Parse --> Notify
  -- success --> Wait

I propose to improve Wait so that user can choose how waiting FlowFile
is handled, from either:
"Route to 'wait' relationship" or "Keep in the Upstream connection".
Currently it has only option to route to 'wait'.

Use "Keep in the Upstream connection" Wait mode with the flow above,
the incoming flow file in GetSQS -> Wait connection stays there until
actual data processing finishes and Notify sends a notification
signal.

I will experiment with this idea and if it works,
I'll submit a JIRA for this and try to add this capability since I've
been working on Wait/Notify processors recently.

Thanks again for sharing your use-case!

Koji


On Tue, Feb 7, 2017 at 6:05 PM, Bas van Kortenhof
<bas.vankorten...@sanoma.com> wrote:
> Hi Koji,
>
> Thanks for the quick response. I have set the batch size to 1 indeed, and
> the flow you describe works, but my problem is a bit more complex. I'll try
> to show it with an example:
>
>
>
> In this case Node 1 is parsing a flow file (indicated by the X in the
> connection between FetchS3Object and Parse). Both connections have a
> backpressure threshold of 1, but because the object is already fetched, the
> first connection is empty and can thus be filled. This means that, if a new
> item becomes available in the queue, both of the following cases can happen
> with equal probability:
>
>
>
> I'd like to force the second case to happen, because node 2 has more
> resources available.
>
> I hope this explains the situation a bit better. So basically I want the
> backpressure to occur based on a threshold on the whole flow, not an
> individual connection. I haven't found a way to do this up to this point.
>
> Hopefully you have an idea how to achieve this.
>
> Regards,
> Bas
>
>
>
> --
> View this message in context: 
> http://apache-nifi-users-list.2361937.n4.nabble.com/Problem-when-using-backpressure-to-distribute-load-over-nodes-in-a-cluster-tp863p877.html
> Sent from the Apache NiFi Users List mailing list archive at Nabble.com.

Reply via email to