Carlos, Do you have the "duplicate" relationship being routed back to the DetectDuplicate? Also if you are trying to do different tables in "parallel" (which is just "concurrent" on a single NiFi instance), I assume you have set Max Concurrent Tasks in IncrementalLoadData to something greater than 1? How are you populating your DistributedMapCache? With a "run-once" separate flow?
An issue I see here is that once the table name is removed from the cache, then unless something "resets the lock", all future flow files with that table name will still be processed. If for some reason you are guaranteed to only have 2 duplicate flow files per table name, this might work (although you would have to manually refresh the cache before running again). An alternative might be to replace DetectDuplicate with PutDistributedMapCache, setting the Cache Update Strategy to "keep original". Then the flow files will have a "cached" attribute, where the first flow file (i.e. when the cache value is not present) will have "cached" set to true and the remainder (until the cache entry is removed) will have "cached" false, so you can use RouteOnAttribute to send the non-cached flow files back to PutDistributedMapCache. In this solution you'd still be relying on a "RemoveDistributedMapCache" processor which does not exist, but you could use the Groovy script as described in my last response. This "lock" idea is interesting, I will give it some more thought, perhaps there is something we could do in the framework and/or extensions to enable this, if it is a common use case. There have been different Jira cases and discussions about barriers and general aggregation patterns, though I'm not sure how/if they'd apply here. Regards, Matt On Mon, Feb 13, 2017 at 2:08 PM, Carlos Manuel Fernandes (DSI) <carlos.antonio.fernan...@cgd.pt> wrote: > Thanks Matt for your quickly response. > > My problem isn’t to process the same table twice, but to guarantee I don’t > process the same table at the same time, what I wish achieve is a > synchronized process for each table. > > Regards > Carlos > > -----Original Message----- > From: Matt Burgess [mailto:mattyb...@apache.org] > Sent: segunda-feira, 13 de Fevereiro de 2017 18:25 > To: users@nifi.apache.org > Subject: Re: RemoveDistributedMapCache > > Carlos, > > With a RemoveDistributedMapCache processor in your suggested flow, there > might be an issue depending on when the duplicates are routed off. For > example, if the first time we see the table name, that flow file gets all the > way through to RemoveDistributedMapCache before a duplicate has been detected > by DetectDuplicate, then the cache entry would be removed and you could > process the same table twice. I guess the question here is: how do you know > when you're "done" with the cache value? > > Also FWIW, speaking of my Groovy DCache script, you can use it (or parts of > it) in an ExecuteScript processor to emulate the functionality of a > RemoveDistributedMapCache processor. > > Regards, > Matt > > > On Mon, Feb 13, 2017 at 12:54 PM, Carlos Manuel Fernandes (DSI) > <carlos.antonio.fernan...@cgd.pt> wrote: >> Hello, >> >> >> >> I ‘m using NIFI to replicate tables from one relational >> Database(Mainframe) to other Database, with incremental updates, >> based on a timestamp and primary key. The process is made with tree custom >> processors: >> GenerateListOfTablesToSyncronize -> CreteTableIfNotExists -> >> IncrementalLoadData. If by mistake, in >> GenerateListOfTablesToSyncronize i generate the same table twice, I >> must guarantee the two flows run sequential not in parallel. For that, I >> need some kind of lock, and the MapCache >> processors seems to be the solution. The solution I see is: >> >> >> >> GenerateListOfTablesToSyncronize -> DetectDuplicte (tableName, with >> no age >> Off) ->CreteTableIfNotExists -> IncrementalLoadData –> >> RemoveDistributedMapCache (tableName) >> >> >> >> Unfortunately there isn’t the processor RemoveDistributedMapCache, I >> could handle this, thanks to Matt Burgess >> (https://community.hortonworks.com/articles/71837/working-with-a-nifi-distributedmapcache.html) >> which make possible manipulate directly the Cache using groovy. No one >> have this kind of requirement to justify the creation of >> RemoveDistributedMapCache ? >> >> >> >> Thanks >> >> >> >> Carlos