Hey Pierre, I'm not sure that this is the best route to go down. There are a couple of problems that I think you will run into. The most important will be what happens when the data going to that Output Port queues up into a large queue? If a NiFi instance then requests data, I presume that the Output Port would determine which FlowFiles to send by calling ProcessSession.get(FlowFileFilter); But currently, if I'm not mistaken, that method only iterates over the data in the 'active' queue, not data that is swapped out. As a result, you could have the active queue filled up with data for nodes that are not pulling, and that would prevent any node from pulling data.
Even if we were to change it so that the get(FlowFileFilter) method runs through swapped out data, the expense of doing that would likely be cost-prohibitive for this approach, as the disk I/O to constantly scan the swap files would be too expensive. To make that approach feasible you'd probably also have to change the Swap File format so that its "summary" also contains a mapping of S2S.host to count of FlowFile for that host. And this is already getting way beyond the scope I think of what you want to do here. Additionally, I feel like where this concept is heading is difficult to explain and is designed for a rather specific use case, because it starts to make this into a sort of quasi-pub-sub mechanism but not a true pub/sub. Rather, I would propose that when the desire is to push data to a specific NiFi node, the preferred approach is not ot use Site-to-Site (as that's intended to be point-to-point between nifi instnace/clusters for well-established endpoints). Typically, the approach that is taken for a scenario like this would be to have a ListenHTTP processor run on each of the instances. They can push to the central instance using Site-to-Site. Then, rather than using an Output Port, you'd use a PostHTTP processor to push the data back. PostHTTP already supports Expression Language for the URL, and it has a "Send as FlowFile" option that properly packages the FlowFiles together with their attributes. It also handles batching together small FlowFiles, supports two-phase commit to minimize possibility of data duplication, etc. This was the method that was used before Site-to-Site was added, and worked quite well for a long time. Site-to-Site was added for convenience so that users could just point to a given URL and be provided the list of available ports and have it auto-load balance across the cluster (if applicable). But in your use case, neither of these really benefit you because you don't know the URL to send to a priori and you already know exactly which node to push to. Thanks -Mark > On Sep 15, 2018, at 9:05 AM, Pierre Villard <pierre.villard...@gmail.com> > wrote: > > Hi all, > > Here is my use case: I've multiple NiFi standalone instances deployed over > multiple sites (that could be MiNiFi instances) and a central NiFi > cluster. The standalone instances generate data, the data is sent to the > central cluster to be parsed and enriched before being sent back to the > standalone instances. The data needs to go back where it's been generated. > > At the moment, since RPG cannot be configured using EL and FFs attributes, > you need to have one port (or one RPG if the RPG is on central NiFi's side) > per standalone instance. And I don't think that changing the RPG to handle > FFs attributes scope would be a good idea in terms of implementation. > > Instead I'd like to change the S2S protocol to allow RPG pulling based on > FFs attributes. > > On the standalone instances, we would have: > Workflow generating data => RPG => workflow receiving enriched data from > central cluster > > On the NiFi cluster, we would have: > input port => workflow parsing and enriching data => output port > > The idea would be that, when configuring an output port in the RPG, it'd be > possible to enable "host based pulling" so that only flow files having the > attribute 's2s.host' matching the host of the instance hosting the RPG > would be pulled. (the s2s.port attribute is already set when data is sent > through S2S). > > I already started working on that approach and even though I don't have > something fully functional yet, I wanted to discuss it here to be sure this > would be interesting for the wider community and, also, if I'm not missing > something obvious that would prevent it. > > Happy to file a JIRA if that sounds interesting. > > Thanks, > Pierre