[ https://issues.apache.org/jira/browse/BEAM-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Work on BEAM-9228 started by Hannah Jiang. ------------------------------------------ > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > -------------------------------------------------------------------- > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Affects Versions: 2.16.0, 2.18.0, 2.19.0 > Reporter: Hannah Jiang > Assignee: Hannah Jiang > Priority: Major > Fix For: 2.20.0 > > Time Spent: 2h 10m > Remaining Estimate: 0h > > A user reported following issue. > ------------------------------------------------- > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > ---------------------------------------------------------------------------------------------------------------------------------------------- > This is caused by [this > PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60]. > A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is > rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed > that data is distributed to multiple workers, however, there are some > regressions with SDF wrapper tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)