Hi Ben,

This blog post written by Mark, would be a good starting point to get
familiar with NiFi Record model.
https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi

HA for DistributedMapCacheClientService and DistributedMapCacheServer
pair is not supported at the moment. If you need HighAvailability,
RedisDistributedMapCacheClientService with Redis replication will
provide that, I haven't tried that myself though.
https://redis.io/topics/replication

Thanks,
Koji

On Tue, Dec 26, 2017 at 7:58 PM, 尹文才 <batman...@gmail.com> wrote:
> Thanks for your quick response, Koji, I haven't heard and seen anything
> about the NiFi record data model when I was reading the NiFi
> documentations,could you tell me where this model is documented? Thanks.
>
> By the way, to my knowledge, when you need to use the 
> DistributedMapCacheServer
> from DistributedMapCacheClientService, you need to specify the host url for
> the server, this means inside a NiFi cluster
> when I specify the cache server and the node suddenly went down, I couldn't
> possibly use it until the node goes up again right? Is there currently such
> a cache server in NiFi that could support HA? Thanks.
>
> Regards,
> Ben
>
> 2017-12-26 18:34 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
>
>> Hi Ben,
>>
>> As you found from existing code, DistributedMapCache is used to share
>> state among different processors, and it can be used by your custom
>> processors, too.
>> However, I'd recommend to avoid such tight dependencies between
>> FlowFiles if possible, or minimize the part in flow that requires that
>> constraint at least for better performance and simplicity.
>> For example, since a FlowFile can hold fairly large amount of data,
>> you could merge all FlowFiles in a single FlowFile, instead of batches
>> of FlowFiles. If you need logical boundaries, you can use NiFi Record
>> data model to embed multiple records within a FlowFile, Record should
>> perform better.
>>
>> Hope this helps.
>>
>> Thanks,
>> Koji
>>
>>
>> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 <batman...@gmail.com> wrote:
>> > Hi guys, I'm currently trying to find a proper way in nifi which could
>> sync
>> > status between my custom processors.
>> > our requirement is like this, we're doing some ETL work using nifi and
>> I'm
>> > extracting the data from DB into batches of FlowFiles(each batch of
>> > FlowFile has a flag FlowFile indicating the end of the batch).
>> > There're some groups of custom processors downstream that need to process
>> > these FlowFiles to do some business logic work. And we expect these
>> > processors to process one batch of FlowFiles at a time.
>> > Therefore we need to implement a custom Wait processor(let's just call it
>> > WaitBatch here) to hold all the other batches of FlowFiles while the
>> > business processors were handling the batch of FlowFiles whose creation
>> > time is earlier.
>> >
>> > In order to implement this, all the WaitBatch processors placed in the
>> flow
>> > need to read/update records in a shared map so that each set of
>> > business-logic processors process one batch at a time.
>> > The entries are keyed using the batch number of the FlowFiles and the
>> value
>> > of each entry is a batch release counter number which counts the number
>> of
>> > times the batch of FlowFiles has passed through
>> > a WaitBatch processor.
>> > When a batch is released by WaitBatch, it will try to increment the batch
>> > number entry's value by 1 and then the released batch number and counter
>> > number will also be saved locally at the WaitBatch with StateManager;
>> > when the next batch reaches the WaitBatch, it will check if the counter
>> > value of the previous released batch number in the shared map is greater
>> > than the one saved locally, if the entry for the batch number does't
>> > exist(already removed) or the value in the shared map is greater, the
>> next
>> > batch will be released and the local state and the entry on the shared
>> map
>> > will be updated similarly.
>> > In the end of the flow, a custom processor will get the batch number from
>> > each batch and remove the entry from the shared map .
>> >
>> > So this implementation requires a shared map that could read/update
>> > frequently and atomically. I checked the Wait/Notify processors in NIFI
>> and
>> > saw it is using the DistributedMapCacheClientService and
>> > DistributedMapCacheServer to sync status, so I'm wondering if I could use
>> > the DistributedMapCacheClientService to implement my logic. I also saw
>> > another implementation called RedisDistributedMapCacheClientService
>> > which seems to require Redis(I haven't used Redis).  Thanks in advance
>> for
>> > any suggestions.
>> >
>> > Regards,
>> > Ben
>>

Reply via email to