Hello,

A use case I find myself running into frequently is the following: I have daily 
or hourly files, and a Beam pipeline with a small to moderate size windows. 
(Actually, I've just seen that support for per-window files support in file 
based sinks was recently checked in, which is one way to get there).

Now, Beam has no clue about the fact that each file corresponds to a given time 
interval. My understanding is that when running the pipeline in batch mode with 
a bounded source, there is no notion watermark and we have to load everything 
because we just don't know. This is pretty wasteful, especially as you have to 
keep a lot of data in memory, while you could in principle operate close to 
what you'd do in streaming mode: first read the oldest files, then newest 
files, moving the watermark forward as you go through the input list of files.

I see one way around this. Let's say that I have hourly files and let's not 
assume anything about the order of records within the file to keep it simple: I 
don't want a very precise record-level watermark, but more a rough watermark at 
the granularity of hours. Say we can easily get the corresponding time interval 
from the filename. One can make an unbounded source that essentially acts as a 
"List of bounded file-based sources". If there are K splits, split k can read 
every file that has `index % K == k` in the time-ordered list of files. 
`advance` can advance the current file, and move on to the next one if no 
records were read.

However, as far as I understand, this pipeline will never terminate since this 
is an unbounded source and having the `advance` method of our wrapping source 
return `false` won't make the pipeline terminate. Can someone confirm if this 
is correct? If yes, what would be ways to work around that? There's always the 
option to throw to make the pipeline fail, but this is far from ideal.

Thanks,

Reply via email to