Hi guys, I'm currently trying to find a proper way in nifi which could sync
status between my custom processors.
our requirement is like this, we're doing some ETL work using nifi and I'm
extracting the data from DB into batches of FlowFiles(each batch of
FlowFile has a flag FlowFile indicating the end of the batch).
There're some groups of custom processors downstream that need to process
these FlowFiles to do some business logic work. And we expect these
processors to process one batch of FlowFiles at a time.
Therefore we need to implement a custom Wait processor(let's just call it
WaitBatch here) to hold all the other batches of FlowFiles while the
business processors were handling the batch of FlowFiles whose creation
time is earlier.

In order to implement this, all the WaitBatch processors placed in the flow
need to read/update records in a shared map so that each set of
business-logic processors process one batch at a time.
The entries are keyed using the batch number of the FlowFiles and the value
of each entry is a batch release counter number which counts the number of
times the batch of FlowFiles has passed through
a WaitBatch processor.
When a batch is released by WaitBatch, it will try to increment the batch
number entry's value by 1 and then the released batch number and counter
number will also be saved locally at the WaitBatch with StateManager;
when the next batch reaches the WaitBatch, it will check if the counter
value of the previous released batch number in the shared map is greater
than the one saved locally, if the entry for the batch number does't
exist(already removed) or the value in the shared map is greater, the next
batch will be released and the local state and the entry on the shared map
will be updated similarly.
In the end of the flow, a custom processor will get the batch number from
each batch and remove the entry from the shared map .

So this implementation requires a shared map that could read/update
frequently and atomically. I checked the Wait/Notify processors in NIFI and
saw it is using the DistributedMapCacheClientService and
DistributedMapCacheServer to sync status, so I'm wondering if I could use
the DistributedMapCacheClientService to implement my logic. I also saw
another implementation called RedisDistributedMapCacheClientService
which seems to require Redis(I haven't used Redis).  Thanks in advance for
any suggestions.

Regards,
Ben

Reply via email to