Hi Janek,
I think you hit a deficiency in the FlinkRunner's SDF implementation.
AFAIK the runner is unable to do dynamic splitting, which is what you
are probably looking for. What you describe essentially works in the
model, but FlinkRunner does not implement the complete contract to make
use of ability to split a large file to multiple parts and process them
in parallel. I'm afraid there is no simple solution currently, other
than what you described. The dynamic splitting might be introduced in
some future release so that it starts working as you expect out of the
box. This should affect mostly users with few large files, if you can
parallelize on the files itself, then it should work fine (which is what
you observe).
Jan
On 3/9/22 16:44, Janek Bevendorff wrote:
I went through all Flink and Beam documentation I could find to see if
I overlooked something, but I could not get the text input source to
unfuse the file input splits. This creates a huge input bottleneck,
because one worker is busy reading records from a huge input file
while 99 others wait for input and I can only shuffle the generated
records, not the actual file input splits.
To fix it, I wrote a custom PTransform that globs files, optionally
shuffles the file names, generates fixed-size split markers, shuffles
the markers, and then reads the lines from these splits. This works
well, but it feels like a hack, because I think Beam should be doing
that out of the box. My implementation of the file reader is also much
simpler and relies on IOBase.readline(), which keeps the code short
and concise, but also loses a lot of flexibility compared to the Beam
file reader implementation (such as support for custom line delimiters).
Any other ideas how I can solve this without writing custom PTransforms?
Janek
On 08/03/2022 14:11, Janek Bevendorff wrote:
Hey there,
According to the docs, when using a FileBasedSource or a splittable
DoFn, the runner is free to initiate splits that can be run in
parallel. As far as I can tell, the former is actually happening on
my Apache Flink cluster, but the latter is not. This causes a single
Taskmanager to process all splits of an input text file. Is this
known behaviour and how can I fix this?
I have a pipeline that looks like this (Python SDK):
(pipeline
| 'Read Input File' >> textio.ReadFromText(input_glob,
min_bundle_size=1)
| 'Reshuffle Lines' >> beam.Reshuffle()
| 'Map Records' >> beam.ParDo(map_func))
The input file is a large, uncompressed plaintext file from a shared
drive containing millions of newline-separated data records. I am
running this job with a parallelism of 100, but it is bottlenecked by
a single worker running ReadFromText(). The reshuffling in between
was added to force Beam/Flink to parallelize the processing, but this
has no effect on the preceding stage. Only the following map
operation is being run in parallel. The stage itself is marked as
having a parallelism of 100, but 99 workers finish immediately.
I had the same issue earlier with another input source, in which I
match a bunch of WARC file globs and then iterate over them in a
splittable DoFn. I solved the missing parallelism by adding an
explicit reshuffle in between matching input globs and actually
reading the individual files:
class WarcInput(beam.PTransform):
def expand(self, pcoll):
return pcoll | MatchFiles(self._file_pattern) |
beam.Reshuffle() | beam.ParDo(WarcReader())
This way I can at least achieve parallelism on file level. This
doesn't work with a single splittable input file, of course, for
which I would have to reshuffle somewhere inside of ReadFromText().
Do I really have to write a custom PTransform that generates initial
splits, shuffles them, and then reads from those splits? I consider
this somewhat essential functionality.
Any hints appreciated.
Janek