Pierre, Mark, Although I agree it is possible to work it around with HTTP (Post/Listen/Invoke), I still think that PUB/SUB for S2S (regardless it is on the same cluster or different ones) worth discussion and implementation. That would make implementation more natural and mature.
Pierre, don't give up :) Let's discuss more on ideas and have a design. In addition, I believe that this idea should be combined with "Womhole connections" <https://cwiki.apache.org/confluence/display/NIFI/Wormhole+Connections> idea. So, that will give us full design for procedural development in NIFI. Thoughts? Ed. On Sat, Sep 15, 2018 at 9:53 AM Pierre Villard <pierre.villard...@gmail.com> wrote: > Hi Mark, > > Thanks for the answer. You're right, I was going to use > ProcessSession.get(FlowFileFilter); > And I considered that I would set an expiration date on the flow file in > case a standalone instance is not pulling data to ensure that the queue is > not filling up. But I didn't think about the data being swapped out and, > you're right, we probably don't want to change that. > > The HTTP approach sounds indeed like a very good option for my use case. > Thanks for mentioning it. > > Pierre > > > > Le sam. 15 sept. 2018 à 15:40, Mark Payne <marka...@hotmail.com> a écrit : > > > Hey Pierre, > > > > I'm not sure that this is the best route to go down. There are a couple > of > > problems that I think > > you will run into. The most important will be what happens when the data > > going to that Output Port > > queues up into a large queue? If a NiFi instance then requests data, I > > presume that the Output Port > > would determine which FlowFiles to send by calling > > ProcessSession.get(FlowFileFilter); > > But currently, if I'm not mistaken, that method only iterates over the > > data in the 'active' queue, not > > data that is swapped out. As a result, you could have the active queue > > filled up with data for nodes > > that are not pulling, and that would prevent any node from pulling data. > > > > Even if we were to change it so that the get(FlowFileFilter) method runs > > through swapped out data, > > the expense of doing that would likely be cost-prohibitive for this > > approach, as the disk I/O to constantly > > scan the swap files would be too expensive. To make that approach > feasible > > you'd probably also have to > > change the Swap File format so that its "summary" also contains a mapping > > of S2S.host to count of FlowFile > > for that host. And this is already getting way beyond the scope I think > of > > what you want to do here. > > > > Additionally, I feel like where this concept is heading is difficult to > > explain and is designed for a rather > > specific use case, because it starts to make this into a sort of > > quasi-pub-sub mechanism but not a true pub/sub. > > > > Rather, I would propose that when the desire is to push data to a > specific > > NiFi node, the preferred approach is > > not ot use Site-to-Site (as that's intended to be point-to-point between > > nifi instnace/clusters for well-established > > endpoints). Typically, the approach that is taken for a scenario like > this > > would be to have a ListenHTTP processor > > run on each of the instances. They can push to the central instance using > > Site-to-Site. Then, rather than using an > > Output Port, you'd use a PostHTTP processor to push the data back. > > PostHTTP already supports Expression Language > > for the URL, and it has a "Send as FlowFile" option that properly > packages > > the FlowFiles together with their attributes. > > It also handles batching together small FlowFiles, supports two-phase > > commit to minimize possibility of data duplication, etc. > > This was the method that was used before Site-to-Site was added, and > > worked quite well for a long time. Site-to-Site was > > added for convenience so that users could just point to a given URL and > be > > provided the list of available ports and have it > > auto-load balance across the cluster (if applicable). But in your use > > case, neither of these really benefit you because you don't > > know the URL to send to a priori and you already know exactly which node > > to push to. > > > > Thanks > > -Mark > > > > > > > > > > > On Sep 15, 2018, at 9:05 AM, Pierre Villard < > pierre.villard...@gmail.com> > > wrote: > > > > > > Hi all, > > > > > > Here is my use case: I've multiple NiFi standalone instances deployed > > over > > > multiple sites (that could be MiNiFi instances) and a central NiFi > > > cluster. The standalone instances generate data, the data is sent to > the > > > central cluster to be parsed and enriched before being sent back to the > > > standalone instances. The data needs to go back where it's been > > generated. > > > > > > At the moment, since RPG cannot be configured using EL and FFs > > attributes, > > > you need to have one port (or one RPG if the RPG is on central NiFi's > > side) > > > per standalone instance. And I don't think that changing the RPG to > > handle > > > FFs attributes scope would be a good idea in terms of implementation. > > > > > > Instead I'd like to change the S2S protocol to allow RPG pulling based > on > > > FFs attributes. > > > > > > On the standalone instances, we would have: > > > Workflow generating data => RPG => workflow receiving enriched data > from > > > central cluster > > > > > > On the NiFi cluster, we would have: > > > input port => workflow parsing and enriching data => output port > > > > > > The idea would be that, when configuring an output port in the RPG, > it'd > > be > > > possible to enable "host based pulling" so that only flow files having > > the > > > attribute 's2s.host' matching the host of the instance hosting the RPG > > > would be pulled. (the s2s.port attribute is already set when data is > sent > > > through S2S). > > > > > > I already started working on that approach and even though I don't have > > > something fully functional yet, I wanted to discuss it here to be sure > > this > > > would be interesting for the wider community and, also, if I'm not > > missing > > > something obvious that would prevent it. > > > > > > Happy to file a JIRA if that sounds interesting. > > > > > > Thanks, > > > Pierre > > > > >