There are two "kinds" of splits in SDF - one splits the restriction
*before* being processed and the other *during* processing. The first
one is supported (it is needed for correctness) and the other is in
bounded case only an optimization (which is not currently supported). It
seems to me, that is should be possible to pre-split on filenames, which
then it should be processed in parallel. Unfortunately I'm not that
familiar with python SDK's fileio, so I'd rather leave the more detailed
answer for someone else. But otherwise what you say makes sense to me.
Jan
On 3/9/22 17:26, Janek Bevendorff wrote:
Thanks for the response! That's what I feared was going on.
I consider this a huge shortcoming, particularly because it does not
only affect users with large files like you said. The same happens
with many small files, because file globs are also fused to one
worker. The only way to process files in parallel is to write a
PTransform that does MatchFiles(file_glob) | Reshuffle() |
ReadAllFromText(). A simple ReadFromText(file_glob) would not be run
in parallel.
In fact, if you feed multiple textfile sources to your job, not only
will each of these inputs process its files on one worker, but even
the inputs are fused together. So even if you resolved the glob
locally and then added one input for each individual file, all of that
would still run sequentially.
On 09/03/2022 17:14, Jan Lukavský wrote:
Hi Janek,
I think you hit a deficiency in the FlinkRunner's SDF implementation.
AFAIK the runner is unable to do dynamic splitting, which is what you
are probably looking for. What you describe essentially works in the
model, but FlinkRunner does not implement the complete contract to
make use of ability to split a large file to multiple parts and
process them in parallel. I'm afraid there is no simple solution
currently, other than what you described. The dynamic splitting might
be introduced in some future release so that it starts working as you
expect out of the box. This should affect mostly users with few large
files, if you can parallelize on the files itself, then it should
work fine (which is what you observe).
Jan
On 3/9/22 16:44, Janek Bevendorff wrote:
I went through all Flink and Beam documentation I could find to see
if I overlooked something, but I could not get the text input source
to unfuse the file input splits. This creates a huge input
bottleneck, because one worker is busy reading records from a huge
input file while 99 others wait for input and I can only shuffle the
generated records, not the actual file input splits.
To fix it, I wrote a custom PTransform that globs files, optionally
shuffles the file names, generates fixed-size split markers,
shuffles the markers, and then reads the lines from these splits.
This works well, but it feels like a hack, because I think Beam
should be doing that out of the box. My implementation of the file
reader is also much simpler and relies on IOBase.readline(), which
keeps the code short and concise, but also loses a lot of
flexibility compared to the Beam file reader implementation (such as
support for custom line delimiters).
Any other ideas how I can solve this without writing custom
PTransforms?
Janek
On 08/03/2022 14:11, Janek Bevendorff wrote:
Hey there,
According to the docs, when using a FileBasedSource or a splittable
DoFn, the runner is free to initiate splits that can be run in
parallel. As far as I can tell, the former is actually happening on
my Apache Flink cluster, but the latter is not. This causes a
single Taskmanager to process all splits of an input text file. Is
this known behaviour and how can I fix this?
I have a pipeline that looks like this (Python SDK):
(pipeline
| 'Read Input File' >> textio.ReadFromText(input_glob,
min_bundle_size=1)
| 'Reshuffle Lines' >> beam.Reshuffle()
| 'Map Records' >> beam.ParDo(map_func))
The input file is a large, uncompressed plaintext file from a
shared drive containing millions of newline-separated data records.
I am running this job with a parallelism of 100, but it is
bottlenecked by a single worker running ReadFromText(). The
reshuffling in between was added to force Beam/Flink to parallelize
the processing, but this has no effect on the preceding stage. Only
the following map operation is being run in parallel. The stage
itself is marked as having a parallelism of 100, but 99 workers
finish immediately.
I had the same issue earlier with another input source, in which I
match a bunch of WARC file globs and then iterate over them in a
splittable DoFn. I solved the missing parallelism by adding an
explicit reshuffle in between matching input globs and actually
reading the individual files:
class WarcInput(beam.PTransform):
def expand(self, pcoll):
return pcoll | MatchFiles(self._file_pattern) |
beam.Reshuffle() | beam.ParDo(WarcReader())
This way I can at least achieve parallelism on file level. This
doesn't work with a single splittable input file, of course, for
which I would have to reshuffle somewhere inside of ReadFromText().
Do I really have to write a custom PTransform that generates
initial splits, shuffles them, and then reads from those splits? I
consider this somewhat essential functionality.
Any hints appreciated.
Janek