Hi guys, I'm currently trying to find a proper way in nifi which could sync status between my custom processors. our requirement is like this, we're doing some ETL work using nifi and I'm extracting the data from DB into batches of FlowFiles(each batch of FlowFile has a flag FlowFile indicating the end of the batch). There're some groups of custom processors downstream that need to process these FlowFiles to do some business logic work. And we expect these processors to process one batch of FlowFiles at a time. Therefore we need to implement a custom Wait processor(let's just call it WaitBatch here) to hold all the other batches of FlowFiles while the business processors were handling the batch of FlowFiles whose creation time is earlier.
In order to implement this, all the WaitBatch processors placed in the flow need to read/update records in a shared map so that each set of business-logic processors process one batch at a time. The entries are keyed using the batch number of the FlowFiles and the value of each entry is a batch release counter number which counts the number of times the batch of FlowFiles has passed through a WaitBatch processor. When a batch is released by WaitBatch, it will try to increment the batch number entry's value by 1 and then the released batch number and counter number will also be saved locally at the WaitBatch with StateManager; when the next batch reaches the WaitBatch, it will check if the counter value of the previous released batch number in the shared map is greater than the one saved locally, if the entry for the batch number does't exist(already removed) or the value in the shared map is greater, the next batch will be released and the local state and the entry on the shared map will be updated similarly. In the end of the flow, a custom processor will get the batch number from each batch and remove the entry from the shared map . So this implementation requires a shared map that could read/update frequently and atomically. I checked the Wait/Notify processors in NIFI and saw it is using the DistributedMapCacheClientService and DistributedMapCacheServer to sync status, so I'm wondering if I could use the DistributedMapCacheClientService to implement my logic. I also saw another implementation called RedisDistributedMapCacheClientService which seems to require Redis(I haven't used Redis). Thanks in advance for any suggestions. Regards, Ben