Hello,

We have a use case and it's not clear how it can be solved/implemented with 
Beam. I count on community help with this, maybe I miss something that lays on 
the surface.

Let’s say, there are two different bounded sources and one join transform (say 
GBK) downstream. This Join transform is like INNER JOIN which joins elements of 
two collections only if they have common key (though, it could be any other 
join logic there, doesn’t matter). What matters is that this Join has to return 
only N records as output and then we have to stop pipeline after they have been 
processed. It means that, in the best case, we need to read only N records from 
every source, join them and move downstream and after pipeline should be 
stopped. In other cases, if some records don’t have common key in other 
collection, we need to read another bunch of records and see if it would be 
enough to have N joined records after Join.

Below, here is a simple example of this. Say, every source contains 1M of 
records but after Join we need to have only 1K of joined records. So, we don’t 
want to read all two millions from 2 sources in case if we can have an output 
after reading much less records in the end. So, 1K of joined records is a stop 
condition. 

1M 
—————
| Source 1 |————
—————              |              ———  
                                |———> | Join  |———> Output 1K and stop
1M                           |              ——— 
—————              |
| Source 2 |————
—————              

So, it looks like I need to have ability to read new portion of data "on 
demand” or like to have a back pressure mechanizm which signals from downstream 
to upstream that “please, give me only N elements and then wait until I ask for 
more”. I’m not sure that Beam supports something like this.

As an idea, I was trying to split initial inputs into fixed Windows with 
trigger “AfterPane.elementCountAtLeast(N)” to read data by limited batches and 
use another “AfterPane.elementCountAtLeast(N)” after Join which should trigger 
only once. It doesn’t work and still, it won’t read data “on demand” and stop 
the whole pipeline, I guess.

Do you think it can be feasible to do in Beam?
Any ideas or advices are very welcomed!


Reply via email to