Hey Pierre,

I'm not sure that this is the best route to go down. There are a couple of 
problems that I think
you will run into. The most important will be what happens when the data going 
to that Output Port
queues up into a large queue? If a NiFi instance then requests data, I presume 
that the Output Port
would determine which FlowFiles to send by calling 
ProcessSession.get(FlowFileFilter);
But currently, if I'm not mistaken, that method only iterates over the data in 
the 'active' queue, not
data that is swapped out. As a result, you could have the active queue filled 
up with data for nodes
that are not pulling, and that would prevent any node from pulling data.

Even if we were to change it so that the get(FlowFileFilter) method runs 
through swapped out data,
the expense of doing that would likely be cost-prohibitive for this approach, 
as the disk I/O to constantly
scan the swap files would be too expensive. To make that approach feasible 
you'd probably also have to
change the Swap File format so that its "summary" also contains a mapping of 
S2S.host to count of FlowFile
for that host. And this is already getting way beyond the scope I think of what 
you want to do here.

Additionally, I feel like where this concept is heading is difficult to explain 
and is designed for a rather
specific use case, because it starts to make this into a sort of quasi-pub-sub 
mechanism but not a true pub/sub.

Rather, I would propose that when the desire is to push data to a specific NiFi 
node, the preferred approach is
not ot use Site-to-Site (as that's intended to be point-to-point between nifi 
instnace/clusters for well-established
endpoints). Typically, the approach that is taken for a scenario like this 
would be to have a ListenHTTP processor
run on each of the instances. They can push to the central instance using 
Site-to-Site. Then, rather than using an
Output Port, you'd use a PostHTTP processor to push the data back. PostHTTP 
already supports Expression Language
for the URL, and it has a "Send as FlowFile" option that properly packages the 
FlowFiles together with their attributes.
It also handles batching together small FlowFiles, supports two-phase commit to 
minimize possibility of data duplication, etc.
This was the method that was used before Site-to-Site was added, and worked 
quite well for a long time. Site-to-Site was
added for convenience so that users could just point to a given URL and be 
provided the list of available ports and have it
auto-load balance across the cluster (if applicable). But in your use case, 
neither of these really benefit you because you don't
know the URL to send to a priori and you already know exactly which node to 
push to.

Thanks
-Mark




> On Sep 15, 2018, at 9:05 AM, Pierre Villard <pierre.villard...@gmail.com> 
> wrote:
> 
> Hi all,
> 
> Here is my use case: I've multiple NiFi standalone instances deployed over
> multiple sites (that could be MiNiFi instances) and a central NiFi
> cluster.  The standalone instances generate data, the data is sent to the
> central cluster to be parsed and enriched before being sent back to the
> standalone instances. The data needs to go back where it's been generated.
> 
> At the moment, since RPG cannot be configured using EL and FFs attributes,
> you need to have one port (or one RPG if the RPG is on central NiFi's side)
> per standalone instance. And I don't think that changing the RPG to handle
> FFs attributes scope would be a good idea in terms of implementation.
> 
> Instead I'd like to change the S2S protocol to allow RPG pulling based on
> FFs attributes.
> 
> On the standalone instances, we would have:
> Workflow generating data => RPG => workflow receiving enriched data from
> central cluster
> 
> On the NiFi cluster, we would have:
> input port => workflow parsing and enriching data => output port
> 
> The idea would be that, when configuring an output port in the RPG, it'd be
> possible to enable "host based pulling" so that only flow files having the
> attribute 's2s.host' matching the host of the instance hosting the RPG
> would be pulled. (the s2s.port attribute is already set when data is sent
> through S2S).
> 
> I already started working on that approach and even though I don't have
> something fully functional yet, I wanted to discuss it here to be sure this
> would be interesting for the wider community and, also, if I'm not missing
> something obvious that would prevent it.
> 
> Happy to file a JIRA if that sounds interesting.
> 
> Thanks,
> Pierre

Reply via email to