Re: RemoveDistributedMapCache

2017-02-13 Thread Matt Burgess
Carlos,

With a RemoveDistributedMapCache processor in your suggested flow,
there might be an issue depending on when the duplicates are routed
off.  For example, if the first time we see the table name, that flow
file gets all the way through to RemoveDistributedMapCache before a
duplicate has been detected by DetectDuplicate, then the cache entry
would be removed and you could process the same table twice. I guess
the question here is: how do you know when you're "done" with the
cache value?

Also FWIW, speaking of my Groovy DCache script, you can use it (or
parts of it) in an ExecuteScript processor to emulate the
functionality of a RemoveDistributedMapCache processor.

Regards,
Matt


On Mon, Feb 13, 2017 at 12:54 PM, Carlos Manuel Fernandes (DSI)
 wrote:
> Hello,
>
>
>
> I ‘m using NIFI to replicate tables from one relational Database(Mainframe)
> to other Database,  with incremental updates,  based on a timestamp and
> primary key. The process is made with tree custom processors:
> GenerateListOfTablesToSyncronize  -> CreteTableIfNotExists ->
> IncrementalLoadData.  If by mistake, in GenerateListOfTablesToSyncronize i
> generate the same table twice, I must guarantee the two flows run sequential
> not in parallel. For that, I need some kind of lock, and the  MapCache
> processors seems to be the solution.The solution I see is:
>
>
>
> GenerateListOfTablesToSyncronize  -> DetectDuplicte (tableName, with no age
> Off) ->CreteTableIfNotExists -> IncrementalLoadData –>
> RemoveDistributedMapCache (tableName)
>
>
>
> Unfortunately there isn’t  the processor RemoveDistributedMapCache, I could
> handle this,  thanks to Matt Burgess
> (https://community.hortonworks.com/articles/71837/working-with-a-nifi-distributedmapcache.html)
> which make possible manipulate directly the Cache using groovy.   No one
> have this kind of requirement to justify the creation of
> RemoveDistributedMapCache ?
>
>
>
> Thanks
>
>
>
> Carlos


RE: RemoveDistributedMapCache

2017-02-13 Thread Carlos Manuel Fernandes (DSI)
Thanks Matt for your quickly response. 

My problem isn’t to process the same table twice,  but to guarantee I don’t 
process the same table at the same time,   what I wish achieve is a 
synchronized process for  each table.

Regards 
Carlos

-Original Message-
From: Matt Burgess [mailto:mattyb...@apache.org] 
Sent: segunda-feira, 13 de Fevereiro de 2017 18:25
To: users@nifi.apache.org
Subject: Re: RemoveDistributedMapCache

Carlos,

With a RemoveDistributedMapCache processor in your suggested flow, there might 
be an issue depending on when the duplicates are routed off.  For example, if 
the first time we see the table name, that flow file gets all the way through 
to RemoveDistributedMapCache before a duplicate has been detected by 
DetectDuplicate, then the cache entry would be removed and you could process 
the same table twice. I guess the question here is: how do you know when you're 
"done" with the cache value?

Also FWIW, speaking of my Groovy DCache script, you can use it (or parts of it) 
in an ExecuteScript processor to emulate the functionality of a 
RemoveDistributedMapCache processor.

Regards,
Matt


On Mon, Feb 13, 2017 at 12:54 PM, Carlos Manuel Fernandes (DSI) 
 wrote:
> Hello,
>
>
>
> I ‘m using NIFI to replicate tables from one relational 
> Database(Mainframe) to other Database,  with incremental updates,  
> based on a timestamp and primary key. The process is made with tree custom 
> processors:
> GenerateListOfTablesToSyncronize  -> CreteTableIfNotExists -> 
> IncrementalLoadData.  If by mistake, in 
> GenerateListOfTablesToSyncronize i generate the same table twice, I 
> must guarantee the two flows run sequential not in parallel. For that, I need 
> some kind of lock, and the  MapCache
> processors seems to be the solution.The solution I see is:
>
>
>
> GenerateListOfTablesToSyncronize  -> DetectDuplicte (tableName, with 
> no age
> Off) ->CreteTableIfNotExists -> IncrementalLoadData –> 
> RemoveDistributedMapCache (tableName)
>
>
>
> Unfortunately there isn’t  the processor RemoveDistributedMapCache, I 
> could handle this,  thanks to Matt Burgess
> (https://community.hortonworks.com/articles/71837/working-with-a-nifi-distributedmapcache.html)
> which make possible manipulate directly the Cache using groovy.   No one
> have this kind of requirement to justify the creation of 
> RemoveDistributedMapCache ?
>
>
>
> Thanks
>
>
>
> Carlos


Re: RemoveDistributedMapCache

2017-02-13 Thread Matt Burgess
Carlos,

Do you have the "duplicate" relationship being routed back to the
DetectDuplicate?  Also if you are trying to do different tables in
"parallel" (which is just "concurrent" on a single NiFi instance), I
assume you have set Max Concurrent Tasks in IncrementalLoadData to
something greater than 1?  How are you populating your
DistributedMapCache?  With a "run-once" separate flow?

An issue I see here is that once the table name is removed from the
cache, then unless something "resets the lock", all future flow files
with that table name will still be processed. If for some reason you
are guaranteed to only have 2 duplicate flow files per table name,
this might work (although you would have to manually refresh the cache
before running again).  An alternative might be to replace
DetectDuplicate with PutDistributedMapCache, setting the Cache Update
Strategy to "keep original". Then the flow files will have a "cached"
attribute, where the first flow file (i.e. when the cache value is not
present) will have "cached" set to true and the remainder (until the
cache entry is removed) will have "cached" false, so you can use
RouteOnAttribute to send the non-cached flow files back to
PutDistributedMapCache.

In this solution you'd still be relying on a
"RemoveDistributedMapCache" processor which does not exist, but you
could use the Groovy script as described in my last response.

This "lock" idea is interesting, I will give it some more thought,
perhaps there is something we could do in the framework and/or
extensions to enable this, if it is a common use case.  There have
been different Jira cases and discussions about barriers and general
aggregation patterns, though I'm not sure how/if they'd apply here.

Regards,
Matt

On Mon, Feb 13, 2017 at 2:08 PM, Carlos Manuel Fernandes (DSI)
 wrote:
> Thanks Matt for your quickly response.
>
> My problem isn’t to process the same table twice,  but to guarantee I don’t 
> process the same table at the same time,   what I wish achieve is a 
> synchronized process for  each table.
>
> Regards
> Carlos
>
> -Original Message-
> From: Matt Burgess [mailto:mattyb...@apache.org]
> Sent: segunda-feira, 13 de Fevereiro de 2017 18:25
> To: users@nifi.apache.org
> Subject: Re: RemoveDistributedMapCache
>
> Carlos,
>
> With a RemoveDistributedMapCache processor in your suggested flow, there 
> might be an issue depending on when the duplicates are routed off.  For 
> example, if the first time we see the table name, that flow file gets all the 
> way through to RemoveDistributedMapCache before a duplicate has been detected 
> by DetectDuplicate, then the cache entry would be removed and you could 
> process the same table twice. I guess the question here is: how do you know 
> when you're "done" with the cache value?
>
> Also FWIW, speaking of my Groovy DCache script, you can use it (or parts of 
> it) in an ExecuteScript processor to emulate the functionality of a 
> RemoveDistributedMapCache processor.
>
> Regards,
> Matt
>
>
> On Mon, Feb 13, 2017 at 12:54 PM, Carlos Manuel Fernandes (DSI) 
>  wrote:
>> Hello,
>>
>>
>>
>> I ‘m using NIFI to replicate tables from one relational
>> Database(Mainframe) to other Database,  with incremental updates,
>> based on a timestamp and primary key. The process is made with tree custom 
>> processors:
>> GenerateListOfTablesToSyncronize  -> CreteTableIfNotExists ->
>> IncrementalLoadData.  If by mistake, in
>> GenerateListOfTablesToSyncronize i generate the same table twice, I
>> must guarantee the two flows run sequential not in parallel. For that, I 
>> need some kind of lock, and the  MapCache
>> processors seems to be the solution.The solution I see is:
>>
>>
>>
>> GenerateListOfTablesToSyncronize  -> DetectDuplicte (tableName, with
>> no age
>> Off) ->CreteTableIfNotExists -> IncrementalLoadData –>
>> RemoveDistributedMapCache (tableName)
>>
>>
>>
>> Unfortunately there isn’t  the processor RemoveDistributedMapCache, I
>> could handle this,  thanks to Matt Burgess
>> (https://community.hortonworks.com/articles/71837/working-with-a-nifi-distributedmapcache.html)
>> which make possible manipulate directly the Cache using groovy.   No one
>> have this kind of requirement to justify the creation of
>> RemoveDistributedMapCache ?
>>
>>
>>
>> Thanks
>>
>>
>>
>> Carlos


RE: RemoveDistributedMapCache

2017-02-14 Thread Carlos Manuel Fernandes (DSI)
Matt,

Yes, i have  the  "duplicate" relationship being routed back to the 
DetectDuplicate. Yes  I set Max Concurrent Tasks in IncrementalLoadData to 
something greater than 1, permitting processing  tables in "parallel".  I Want 
process  many flows with a specific tableName since I don’t process them in 
"parallel", because  IncrementalLoadData has tree steps must be atomic : "Read 
the max(Timestamp) in target table"  +  "Read data from source > 
max(timestamp)" + "apply data to destination". 

With this requisite,  a simple put tablename in cache,  validate if tablename  
is in cache (and if yes wait),  and remove from cache after the job  done , 
function like a semaphore and function well. I could accomplish that using your 
Dcache Code  to  build a  custom  RemoveDistributedMapCache.

I think for other uses cases  when a simple semaphore is  sufficient  the  
RemoveDistributedMapCache  can be useful.

Thanks 

Carlos



-Original Message-
From: Matt Burgess [mailto:mattyb...@apache.org] 
Sent: segunda-feira, 13 de Fevereiro de 2017 19:46
To: users@nifi.apache.org
Subject: Re: RemoveDistributedMapCache

Carlos,

Do you have the "duplicate" relationship being routed back to the 
DetectDuplicate?  Also if you are trying to do different tables in "parallel" 
(which is just "concurrent" on a single NiFi instance), I assume you have set 
Max Concurrent Tasks in IncrementalLoadData to something greater than 1?  How 
are you populating your DistributedMapCache?  With a "run-once" separate flow?

An issue I see here is that once the table name is removed from the cache, then 
unless something "resets the lock", all future flow files with that table name 
will still be processed. If for some reason you are guaranteed to only have 2 
duplicate flow files per table name, this might work (although you would have 
to manually refresh the cache before running again).  An alternative might be 
to replace DetectDuplicate with PutDistributedMapCache, setting the Cache 
Update Strategy to "keep original". Then the flow files will have a "cached"
attribute, where the first flow file (i.e. when the cache value is not
present) will have "cached" set to true and the remainder (until the cache 
entry is removed) will have "cached" false, so you can use RouteOnAttribute to 
send the non-cached flow files back to PutDistributedMapCache.

In this solution you'd still be relying on a "RemoveDistributedMapCache" 
processor which does not exist, but you could use the Groovy script as 
described in my last response.

This "lock" idea is interesting, I will give it some more thought, perhaps 
there is something we could do in the framework and/or extensions to enable 
this, if it is a common use case.  There have been different Jira cases and 
discussions about barriers and general aggregation patterns, though I'm not 
sure how/if they'd apply here.

Regards,
Matt

On Mon, Feb 13, 2017 at 2:08 PM, Carlos Manuel Fernandes (DSI) 
 wrote:
> Thanks Matt for your quickly response.
>
> My problem isn’t to process the same table twice,  but to guarantee I don’t 
> process the same table at the same time,   what I wish achieve is a 
> synchronized process for  each table.
>
> Regards
> Carlos
>
> -Original Message-
> From: Matt Burgess [mailto:mattyb...@apache.org]
> Sent: segunda-feira, 13 de Fevereiro de 2017 18:25
> To: users@nifi.apache.org
> Subject: Re: RemoveDistributedMapCache
>
> Carlos,
>
> With a RemoveDistributedMapCache processor in your suggested flow, there 
> might be an issue depending on when the duplicates are routed off.  For 
> example, if the first time we see the table name, that flow file gets all the 
> way through to RemoveDistributedMapCache before a duplicate has been detected 
> by DetectDuplicate, then the cache entry would be removed and you could 
> process the same table twice. I guess the question here is: how do you know 
> when you're "done" with the cache value?
>
> Also FWIW, speaking of my Groovy DCache script, you can use it (or parts of 
> it) in an ExecuteScript processor to emulate the functionality of a 
> RemoveDistributedMapCache processor.
>
> Regards,
> Matt
>
>
> On Mon, Feb 13, 2017 at 12:54 PM, Carlos Manuel Fernandes (DSI) 
>  wrote:
>> Hello,
>>
>>
>>
>> I ‘m using NIFI to replicate tables from one relational
>> Database(Mainframe) to other Database,  with incremental updates, 
>> based on a timestamp and primary key. The process is made with tree custom 
>> processors:
>> GenerateListOfTablesToSyncronize  -> CreteTableIfNotExists -> 
>> IncrementalLoadData.  If by mistake, in 
>>