Thanks for your quick response, Koji, I haven't heard and seen anything
about the NiFi record data model when I was reading the NiFi
documentations,could you tell me where this model is documented? Thanks.

By the way, to my knowledge, when you need to use the DistributedMapCacheServer
from DistributedMapCacheClientService, you need to specify the host url for
the server, this means inside a NiFi cluster
when I specify the cache server and the node suddenly went down, I couldn't
possibly use it until the node goes up again right? Is there currently such
a cache server in NiFi that could support HA? Thanks.

Regards,
Ben

2017-12-26 18:34 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:

> Hi Ben,
>
> As you found from existing code, DistributedMapCache is used to share
> state among different processors, and it can be used by your custom
> processors, too.
> However, I'd recommend to avoid such tight dependencies between
> FlowFiles if possible, or minimize the part in flow that requires that
> constraint at least for better performance and simplicity.
> For example, since a FlowFile can hold fairly large amount of data,
> you could merge all FlowFiles in a single FlowFile, instead of batches
> of FlowFiles. If you need logical boundaries, you can use NiFi Record
> data model to embed multiple records within a FlowFile, Record should
> perform better.
>
> Hope this helps.
>
> Thanks,
> Koji
>
>
> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 <batman...@gmail.com> wrote:
> > Hi guys, I'm currently trying to find a proper way in nifi which could
> sync
> > status between my custom processors.
> > our requirement is like this, we're doing some ETL work using nifi and
> I'm
> > extracting the data from DB into batches of FlowFiles(each batch of
> > FlowFile has a flag FlowFile indicating the end of the batch).
> > There're some groups of custom processors downstream that need to process
> > these FlowFiles to do some business logic work. And we expect these
> > processors to process one batch of FlowFiles at a time.
> > Therefore we need to implement a custom Wait processor(let's just call it
> > WaitBatch here) to hold all the other batches of FlowFiles while the
> > business processors were handling the batch of FlowFiles whose creation
> > time is earlier.
> >
> > In order to implement this, all the WaitBatch processors placed in the
> flow
> > need to read/update records in a shared map so that each set of
> > business-logic processors process one batch at a time.
> > The entries are keyed using the batch number of the FlowFiles and the
> value
> > of each entry is a batch release counter number which counts the number
> of
> > times the batch of FlowFiles has passed through
> > a WaitBatch processor.
> > When a batch is released by WaitBatch, it will try to increment the batch
> > number entry's value by 1 and then the released batch number and counter
> > number will also be saved locally at the WaitBatch with StateManager;
> > when the next batch reaches the WaitBatch, it will check if the counter
> > value of the previous released batch number in the shared map is greater
> > than the one saved locally, if the entry for the batch number does't
> > exist(already removed) or the value in the shared map is greater, the
> next
> > batch will be released and the local state and the entry on the shared
> map
> > will be updated similarly.
> > In the end of the flow, a custom processor will get the batch number from
> > each batch and remove the entry from the shared map .
> >
> > So this implementation requires a shared map that could read/update
> > frequently and atomically. I checked the Wait/Notify processors in NIFI
> and
> > saw it is using the DistributedMapCacheClientService and
> > DistributedMapCacheServer to sync status, so I'm wondering if I could use
> > the DistributedMapCacheClientService to implement my logic. I also saw
> > another implementation called RedisDistributedMapCacheClientService
> > which seems to require Redis(I haven't used Redis).  Thanks in advance
> for
> > any suggestions.
> >
> > Regards,
> > Ben
>

Reply via email to