Thanks to your example I just learned something I didn't know. Apparently the Wait Processor actually deletes the key from the distribute cache when it let's a flowfile through. That explains a lot of weirdness I'd been seeing. I was operating under the impression only the Notify Processor changed the contents of the cache.
-----Original Message----- From: Koji Kawamura <ijokaruma...@gmail.com> Sent: Wednesday, April 24, 2019 7:11 PM To: users@nifi.apache.org Subject: Re: Implementing Gates with the Wait and Notify Processors Hi Shawn, I have done similar flow before, using Wait/Notify processors to implement a gate. Here is an example flow. https://gist.github.com/ijokarumawak/9e1a4855934f2bb9661f88ca625bd244 To implement atomic locks Wait/Notify processors use Cache clients implementing AtomicDistributedMapCacheClient, that provides atomic 'replace' method using optimistic lock when Wait/Notify updates its signal in the cache. For a NiFi cluster wide atomic Wait/Notify, I'd use CouchbaseMapCacheClient or RedisMapCacheClient with an external cache server, so that each NiFi node in a same cluster can use the same cache key spaces and handling NiFi cluster topology changes better. I hope you find the example useful. Thanks, Koji On Thu, Apr 25, 2019 at 1:23 AM Shawn Weeks <swe...@weeksconsulting.us> wrote: > > Maybe it's just the documentation that's way off. I would expect that if I > said wait for the counter to be 1 that it releases things when the counter is > 1 not when the counter is 2 or 5 or 10. > > I guess what I really need is atomic locks in NiFi and I'm not really sure > how to build that out when things are clustered. > > Thanks > Shawn > ________________________________ > From: Bryan Bende <bbe...@gmail.com> > Sent: Wednesday, April 24, 2019 9:25 AM > To: users@nifi.apache.org > Subject: Re: Implementing Gates with the Wait and Notify Processors > > I think the Wait/Notify processors may have changed a bit since I last > used them, but the original use case was for when a processor produces > two flow files, and you want to send one of them down one part of the > flow, and have the other wait until that part is done. The easiest > example to think of is a "split" processor that has an "original" > relationship and "splits" relationship. You send the original to a > Wait, and send the splits down a part of the flow that ends with > Notify, so when all the splits have notified then the original is > released. Koji wrote a nice blog about this [1]. > > I haven't tried to use Wait/Notify to create the kind of continuous > gate you want, so I'm not sure exactly how to do it, but thought it > might be helpful to see some background on how it can be used. > > [1] https://ijokarumawak.github.io/nifi/2017/02/02/nifi-notify-batch/ > > On Wed, Apr 24, 2019 at 10:06 AM Jerry Vinokurov <grapesmo...@gmail.com> > wrote: > > > > My understanding of how Wait/Notify worked was that the Wait processor > > would look for the count in the target signal to reach a specific value, at > > which point it would open and let through any flowfiles that were waiting. > > I'm not sure if the target value that it's looking for is actually a delta > > between whatever the cached value is and some internally-stored state that > > Wait maintains; it would make sense to me if that were the case because it > > would eliminate the need to keep resetting the counter. But I'm fairly > > certain that this will not work the way you describe it, where the > > running_count property has to be set to something on the flowfile in order > > for the flowfile to go through. > > > > On Tue, Apr 23, 2019 at 10:39 PM Shawn Weeks <swe...@weeksconsulting.us> > > wrote: > >> > >> Running into some additional inconsistencies. I’m under the impression > >> that only the Notify Processor can increment the counter. In example I’m > >> testing I have Notify that sets the signal count “running_count” to zero > >> and then immediately after it I have a Notify that increments by 2 yet > >> when I run the groovy cache dumps script on > >> http://funnifi.blogspot.com/2016/04/inspecting-your-nifi.html I can see > >> “running_count” may be set to something much higher like 50 or 60. Another > >> seems to be that the Wait Processor lets things through that don’t match > >> the Target Signal Count. For example if the target signal count is 1 then > >> it’s letting things through that have a target signal count of 2? I’ve got > >> to be missing something rather obvious. > >> > >> > >> > >> Thanks > >> > >> Shawn > >> > >> > >> > >> From: Shawn Weeks <swe...@weeksconsulting.us> > >> Sent: Tuesday, April 23, 2019 8:32 PM > >> To: users@nifi.apache.org > >> Subject: Implementing Gates with the Wait and Notify Processors > >> > >> > >> > >> I’m working to implement a flow where for a given source of data I can > >> only be processing one set at a time due to external dependencies. Each > >> set needs to go through several different steps so this isn’t just a > >> matter of limiting concurrency for a single processor. I’m trying to > >> implement this using Wait and Notify as a gate and I’ve ran into a couple > >> of limitations that I’m not sure how to get around. I first set my Wait > >> processors to wait for a specific counter to be reset to zero before > >> allowing a data source through but I quickly discovered that the Wait > >> Processors tries to divide the signal counter leading to a divide by zero > >> error. I’m assuming if zero isn’t a valid value for the signal counter we > >> should disallow it however since you can’t use the Notify processor to set > >> arbitrary values other than zero I’m not sure how your supposed to make a > >> 0 or 1 gate. Are you supposed to have two Notify Processors back to back > >> where one resets the counter to zero and the next increments by one? That > >> seems a bit clunky. > >> > >> > >> > >> Thoughts? > >> > >> > >> > >> Thanks > >> > >> Shawn Weeks > > > > > > > > -- > > http://www.google.com/profiles/grapesmoker