Thanks to your example I just learned something I didn't know. Apparently the 
Wait Processor actually deletes the key from the distribute cache when it let's 
a flowfile through. That explains a lot of weirdness I'd been seeing. I was 
operating under the impression only the Notify Processor changed the contents 
of the cache.

-----Original Message-----
From: Koji Kawamura <ijokaruma...@gmail.com> 
Sent: Wednesday, April 24, 2019 7:11 PM
To: users@nifi.apache.org
Subject: Re: Implementing Gates with the Wait and Notify Processors

Hi Shawn,

I have done similar flow before, using Wait/Notify processors to implement a 
gate.
Here is an example flow.
https://gist.github.com/ijokarumawak/9e1a4855934f2bb9661f88ca625bd244

To implement atomic locks Wait/Notify processors use Cache clients implementing 
AtomicDistributedMapCacheClient, that provides atomic 'replace' method using 
optimistic lock when Wait/Notify updates its signal in the cache.
For a NiFi cluster wide atomic Wait/Notify, I'd use CouchbaseMapCacheClient or 
RedisMapCacheClient with an external cache server, so that each NiFi node in a 
same cluster can use the same cache key spaces and handling NiFi cluster 
topology changes better.

I hope you find the example useful.

Thanks,
Koji

On Thu, Apr 25, 2019 at 1:23 AM Shawn Weeks <swe...@weeksconsulting.us> wrote:
>
> Maybe it's just the documentation that's way off. I would expect that if I 
> said wait for the counter to be 1 that it releases things when the counter is 
> 1 not when the counter is 2 or 5 or 10.
>
>  I guess what I really need is atomic locks in NiFi and I'm not really sure 
> how to build that out when things are clustered.
>
> Thanks
> Shawn
> ________________________________
> From: Bryan Bende <bbe...@gmail.com>
> Sent: Wednesday, April 24, 2019 9:25 AM
> To: users@nifi.apache.org
> Subject: Re: Implementing Gates with the Wait and Notify Processors
>
> I think the Wait/Notify processors may have changed a bit since I last 
> used them, but the original use case was for when a processor produces 
> two flow files, and you want to send one of them down one part of the 
> flow, and have the other wait until that part is done. The easiest 
> example to think of is a "split" processor that has an "original"
> relationship and "splits" relationship. You send the original to a 
> Wait, and send the splits down a part of the flow that ends with 
> Notify, so when all the splits have notified then the original is 
> released. Koji wrote a nice blog about this [1].
>
> I haven't tried to use Wait/Notify to create the kind of continuous 
> gate you want, so I'm not sure exactly how to do it, but thought it 
> might be helpful to see some background on how it can be used.
>
> [1] https://ijokarumawak.github.io/nifi/2017/02/02/nifi-notify-batch/
>
> On Wed, Apr 24, 2019 at 10:06 AM Jerry Vinokurov <grapesmo...@gmail.com> 
> wrote:
> >
> > My understanding of how Wait/Notify worked was that the Wait processor 
> > would look for the count in the target signal to reach a specific value, at 
> > which point it would open and let through any flowfiles that were waiting. 
> > I'm not sure if the target value that it's looking for is actually a delta 
> > between whatever the cached value is and some internally-stored state that 
> > Wait maintains; it would make sense to me if that were the case because it 
> > would eliminate the need to keep resetting the counter. But I'm fairly 
> > certain that this will not work the way you describe it, where the 
> > running_count property has to be set to something on the flowfile in order 
> > for the flowfile to go through.
> >
> > On Tue, Apr 23, 2019 at 10:39 PM Shawn Weeks <swe...@weeksconsulting.us> 
> > wrote:
> >>
> >> Running into some additional inconsistencies. I’m under the impression 
> >> that only the Notify Processor can increment the counter. In example I’m 
> >> testing I have Notify that sets the signal count “running_count” to zero 
> >> and then immediately after it I have a Notify that increments by 2 yet 
> >> when I run the groovy cache dumps script on 
> >> http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html I can see 
> >> “running_count” may be set to something much higher like 50 or 60. Another 
> >> seems to be that the Wait Processor lets things through that don’t match 
> >> the Target Signal Count. For example if the target signal count is 1 then 
> >> it’s letting things through that have a target signal count of 2? I’ve got 
> >> to be missing something rather obvious.
> >>
> >>
> >>
> >> Thanks
> >>
> >> Shawn
> >>
> >>
> >>
> >> From: Shawn Weeks <swe...@weeksconsulting.us>
> >> Sent: Tuesday, April 23, 2019 8:32 PM
> >> To: users@nifi.apache.org
> >> Subject: Implementing Gates with the Wait and Notify Processors
> >>
> >>
> >>
> >> I’m working to implement a flow where for a given source of data I can 
> >> only be processing one set at a time due to external dependencies. Each 
> >> set needs to go through several different steps so this isn’t just a 
> >> matter of limiting concurrency for a single processor. I’m trying to 
> >> implement this using Wait and Notify as a gate and I’ve ran into a couple 
> >> of limitations that I’m not sure how to get around. I first set my Wait 
> >> processors to wait for a specific counter to be reset to zero before 
> >> allowing a data source through but I quickly discovered that the Wait 
> >> Processors tries to divide the signal counter leading to a divide by zero 
> >> error. I’m assuming if zero isn’t a valid value for the signal counter we 
> >> should disallow it however since you can’t use the Notify processor to set 
> >> arbitrary values other than zero I’m not sure how your supposed to make a 
> >> 0 or 1 gate. Are you supposed to have two Notify Processors back to back 
> >> where one resets the counter to zero and the next increments by one? That 
> >> seems a bit clunky.
> >>
> >>
> >>
> >> Thoughts?
> >>
> >>
> >>
> >> Thanks
> >>
> >> Shawn Weeks
> >
> >
> >
> > --
> > http://www.google.com/profiles/grapesmoker

Reply via email to