[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=392212=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-392212 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 25/Feb/20 00:28 Start Date: 25/Feb/20 00:28 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 392212) Time Spent: 3h 20m (was: 3h 10m) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 3h 20m > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > -- > This is caused by [this > PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60]. > A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is > rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed > that data is distributed to multiple workers, however, there are some > regressions with SDF wrapper tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=392208=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-392208 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 25/Feb/20 00:07 Start Date: 25/Feb/20 00:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#discussion_r383587744 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py ## @@ -1603,23 +1603,6 @@ def restriction_size(self, element, restriction): return restriction.size() -class FnApiRunnerSplitTestWithMultiWorkers(FnApiRunnerSplitTest): Review comment: Is there any loss of coverage in removing these tests? I can see how perhaps they're too strict. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 392208) Time Spent: 3h 10m (was: 3h) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 3h 10m > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > -- > This is caused by [this > PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60]. > A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is > rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed > that data is distributed to multiple workers, however, there are some > regressions with SDF wrapper tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=392207=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-392207 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 25/Feb/20 00:07 Start Date: 25/Feb/20 00:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#discussion_r383587172 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -932,10 +994,14 @@ def get_buffer(buffer_id): return pcoll_buffers[buffer_id] def get_input_coder_impl(transform_id): - return context.coders[ - safe_coders[beam_fn_api_pb2.RemoteGrpcPort.FromString( - process_bundle_descriptor.transforms[transform_id].spec.payload). - coder_id]].get_impl() + coder_id = beam_fn_api_pb2.RemoteGrpcPort.FromString( + process_bundle_descriptor.transforms[transform_id].spec.payload + ).coder_id + assert coder_id + if coder_id in safe_coders: Review comment: An alternative would be `safe_coders.get(coder_id, coder_id)` which means look up the key (first argument) and if it doesn't exist return the second argument. This would eliminate some of the repetition of logic between the lines as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 392207) Time Spent: 3h 10m (was: 3h) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 3h 10m > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > -- > This is caused by [this > PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60]. > A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is > rolling back iobase.py not
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=392059=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-392059 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 24/Feb/20 20:10 Start Date: 24/Feb/20 20:10 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on issue #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#issuecomment-590525757 kindly ping. @robertwb This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 392059) Time Spent: 3h (was: 2h 50m) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 3h > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > -- > This is caused by [this > PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60]. > A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is > rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed > that data is distributed to multiple workers, however, there are some > regressions with SDF wrapper tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=390983=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-390983 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 22/Feb/20 00:11 Start Date: 22/Feb/20 00:11 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on pull request #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#discussion_r381568266 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -994,7 +1069,13 @@ def input_for(transform_id, input_id): # The worker will be waiting on these inputs as well. for other_input in data_input: if other_input not in deferred_inputs: -deferred_inputs[other_input] = _ListBuffer([]) +outputs = process_bundle_descriptor.transforms[ + other_input].outputs.values() +coder_id = process_bundle_descriptor.pcollections[ + only_element(outputs)].coder_id +coder = context.coders[coder_id] +deferred_inputs[other_input] = _ListBuffer( +coder_impl=coder.get_impl()) Review comment: As commented at L1082 (of the PR branch), deferred inputs cannot be parallel processed for now. Is it better to set coder_impl to None to reduce unnecessary processes for now and add it back later when parallel processing is supported for deferred_inputs? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 390983) Time Spent: 2h 50m (was: 2h 40m) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 2h 50m > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > -- > This is caused by [this > PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60]. > A
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=390308=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-390308 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 20/Feb/20 23:11 Start Date: 20/Feb/20 23:11 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on issue #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#issuecomment-589401291 @robertwb, please take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 390308) Time Spent: 2h 40m (was: 2.5h) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 2h 40m > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > -- > This is caused by [this > PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60]. > A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is > rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed > that data is distributed to multiple workers, however, there are some > regressions with SDF wrapper tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=390261=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-390261 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 20/Feb/20 20:54 Start Date: 20/Feb/20 20:54 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on issue #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#issuecomment-589303274 retest it please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 390261) Time Spent: 2.5h (was: 2h 20m) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 2.5h > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > -- > This is caused by [this > PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60]. > A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is > rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed > that data is distributed to multiple workers, however, there are some > regressions with SDF wrapper tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=389654=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-389654 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 19/Feb/20 21:55 Start Date: 19/Feb/20 21:55 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on pull request #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#discussion_r381568266 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -994,7 +1069,13 @@ def input_for(transform_id, input_id): # The worker will be waiting on these inputs as well. for other_input in data_input: if other_input not in deferred_inputs: -deferred_inputs[other_input] = _ListBuffer([]) +outputs = process_bundle_descriptor.transforms[ + other_input].outputs.values() +coder_id = process_bundle_descriptor.pcollections[ + only_element(outputs)].coder_id +coder = context.coders[coder_id] +deferred_inputs[other_input] = _ListBuffer( +coder_impl=coder.get_impl()) Review comment: As commented at L1082 (of the PR branch), deferred inputs cannot be parallel processed for now. Is it better to set coder_impl to None to reduce unnecessary processes for now and add it back later when parallel processing is supported for deferred_inputs? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 389654) Time Spent: 2h 20m (was: 2h 10m) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 2h 20m > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > -- > This is caused by [this > PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60]. > A
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=389236=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-389236 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 19/Feb/20 00:49 Start Date: 19/Feb/20 00:49 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#discussion_r381019006 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -932,10 +998,14 @@ def get_buffer(buffer_id): return pcoll_buffers[buffer_id] def get_input_coder_impl(transform_id): - return context.coders[ - safe_coders[beam_fn_api_pb2.RemoteGrpcPort.FromString( - process_bundle_descriptor.transforms[transform_id].spec.payload). - coder_id]].get_impl() + coder_id = beam_fn_api_pb2.RemoteGrpcPort.FromString( + process_bundle_descriptor.transforms[transform_id].spec.payload).\ +coder_id + assert coder_id is not None and coder_id != '' Review comment: OK, in the SDF case we can get the coder of the preceding transform (the one that produces its input `ref_PCollection_PCollection_3_split`) which should always be a RemoteGrpcPort. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 389236) Time Spent: 2h 10m (was: 2h) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 2h 10m > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > -- > This is caused by [this > PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60]. > A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is > rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed > that data
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=389233=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-389233 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 19/Feb/20 00:49 Start Date: 19/Feb/20 00:49 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#discussion_r381016780 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -287,43 +287,48 @@ def partition(self, n): class _ListBuffer(): """Used to support parititioning of a list.""" - def __init__(self, input_coder): -self._input_coder = input_coder + def __init__(self, coder_impl): +self._coder_impl = coder_impl self._inputs = [] self._grouped_output = None self.cleared = False def append(self, element): +if self.cleared: + raise RuntimeError('Trying to append to a cleared ListBuffer.') if self._grouped_output: raise RuntimeError('ListBuffer append after read.') self._inputs.append(element) def partition(self, n): # type: (int) -> List[List[bytes]] +if self.cleared: + raise RuntimeError('Trying to partition a cleared ListBuffer.') if len(self._inputs) >= n or len(self._inputs) == 0: return [self._inputs[k::n] for k in range(n)] else: if not self._grouped_output: -self._grouped_output = [[] for _ in range(n)] -coder_impl = self._input_coder.get_impl() -decoded_input = [] -output_stream_list = [] -for _ in range(n): - output_stream_list.append(create_OutputStream()) +output_stream_list = [create_OutputStream() for _ in range(n)] +self._grouped_output = [output_stream.get() for output_stream Review comment: I meant you could put this instead of the loop at line 324. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 389233) Time Spent: 2h (was: 1h 50m) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 2h > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=389235=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-389235 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 19/Feb/20 00:49 Start Date: 19/Feb/20 00:49 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#discussion_r381018471 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -932,10 +998,14 @@ def get_buffer(buffer_id): return pcoll_buffers[buffer_id] def get_input_coder_impl(transform_id): - return context.coders[ - safe_coders[beam_fn_api_pb2.RemoteGrpcPort.FromString( - process_bundle_descriptor.transforms[transform_id].spec.payload). - coder_id]].get_impl() + coder_id = beam_fn_api_pb2.RemoteGrpcPort.FromString( + process_bundle_descriptor.transforms[transform_id].spec.payload).\ Review comment: Lint: don't use backslashes for continuation. (These days you can just run yapf to format your code, see https://cwiki.apache.org/confluence/display/BEAM/Python+Tips.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 389235) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 2h > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > -- > This is caused by [this > PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60]. > A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is > rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed > that data is distributed to multiple workers, however, there are some > regressions with SDF wrapper tests. -- This message was
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=389234=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-389234 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 19/Feb/20 00:49 Start Date: 19/Feb/20 00:49 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#discussion_r381017895 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -769,7 +774,7 @@ def _collect_written_timers_and_add_to_deferred_inputs( for windowed_key_timer in timers_by_key_and_window.values(): windowed_timer_coder_impl.encode_to_stream( windowed_key_timer, out, True) -deferred_inputs[transform_id] = _ListBuffer(input_coder=coder) +deferred_inputs[transform_id] = _ListBuffer(coder_impl=coder.get_impl()) Review comment: You can now revert the changes up at the top of the loop. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 389234) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 2h > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > -- > This is caused by [this > PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60]. > A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is > rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed > that data is distributed to multiple workers, however, there are some > regressions with SDF wrapper tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=387774=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-387774 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 15/Feb/20 00:59 Start Date: 15/Feb/20 00:59 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on pull request #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#discussion_r379704037 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -932,10 +998,14 @@ def get_buffer(buffer_id): return pcoll_buffers[buffer_id] def get_input_coder_impl(transform_id): - return context.coders[ - safe_coders[beam_fn_api_pb2.RemoteGrpcPort.FromString( - process_bundle_descriptor.transforms[transform_id].spec.payload). - coder_id]].get_impl() + coder_id = beam_fn_api_pb2.RemoteGrpcPort.FromString( + process_bundle_descriptor.transforms[transform_id].spec.payload).\ +coder_id + assert coder_id is not None and coder_id != '' Review comment: `coder_id` can be retrieved from transform payload most cases, except one following case. It returns `''` coder_id. This cause `test_checkpoint_sdf` test fails. How do we need to handle this case? Other SDF tests should be fixed now. ``` spec { urn: "beam:transform:sdf_process_sized_element_and_restrictions:v1" payload: "\n\215\014\n beam:dofn:pickled_python_info:v1\032\350\013eNp9ldt31FQUxnNmBihpKSCorRdE8TLVOlBaLkXFy0AFI0NNEaJSYiY50wQyyXzJCVKcEVAzxfv9vf0XXcu1fPDBfc5MVy9QXmbNuezz/fa39zm5Wyy7XhCGFVv+6rabcEdwWyy2uA5t41oYO15/hVklTdPkfxRyFE2UrCGaOBtlTZ7QAXNeA9tcp+W4Prfr3GlWROJEaSNOmmnFjROun4lnIh3bc+zoYKBs6RRt283Yy0Ju29hpDfQmnCCioW7toGEriV2ephh8GHEji1wRxHTmUNlnKrwae/ySRNyVY9jE7rJRNDRjm1EyJqszC1qn2C4sayvaddYp3Rqn/4V2aYUlp9rF5WKbtUuNwgpj2n1tubSiMa1d8tgJrVPCnitau3iZZjxtTsOemsG62Fu29pGgmyUJj4Sd8FQkgcLBY9Y2WkmFkwjss3ZK15JF2w2doIn91nYaO60Wjzw8rhaDiPJp0iF4QuDJsrI55WEDI8oCHvYWR5XeOh2b/HVv8ARPqX0itnkzEHjaKva48IzAs0ou9RqV1iIO+IMXNFYd1kolprMhNswG2FABz1m712RsN84iQYcezPH8mMALJg65tl3PglDIuuirBdL9wUcW2zbXSGXhZ53Eaep4ceweXurg5bK1f1M2VOqbgUfKr/ha2S9Ze9c31+zqYvmRous0VyN0jOV4tYPXyv5OX1ceBlEgAifcULNxn3qI2qVA7VKgdpmrDgltmXmszVYKVPTXa4bWReUB6n4NUhy2BmnpYqORcmE60QLHkS4m/BF/tIuj/gF/3Pi7WtAYJsfI1SkTx7ZwdbxW6+B4TeCEiZPWsCxl/372qz29gbRIpLokJU5GnKdqOd7wD1sjazBrtJf6R7zZxVv+iAJe78HpLt4m0mnjH0X6jiR918R7W5BOS9IqkZ4xcbbX8q2QGnCmDyhvXtFgBLi9XViQcO8T3Lkx6Yl/egnnSWvG+Je0CvhAahkmPtxCa0ZqXSCtmomL1p5NRUiD2xyzG2QLJDvQu+wk/BEJm72LJbfO5bi0SvExUcwa/6mML0uKKyasLShmJcUnRPGpic/UpbNtL6bNuFqzdqlhGsYicpo8pcn5eVzLBGwTn1PXO+qBaslrYAce6tYhGq5vWLlSfqDjx+Bm9Rzewx5Al9MAvJ84o8QZJV6tDgitrY3KxBuU+II1KtFagXsj5J696aKn8HMEyhvJjes5bqx/NMJedBb14jeHo3nhGlPmRdK82ERrC/Oa0jyQeYmJ1DpIh04frfOTE8ecqfqkNzVVn6jzqSMnjx6ZPO5OT7j82AREjszEzRxfmLiVY7GD2xT/pYm2f7Xmk70d1cPqA2X7QSRSfPUwn2RinshaIX3G7qiQ8xflV+KcCrmrZoKolQl1Uop76ssWZ2Jt6mvlSZwEC0GEb5bwrcpgw1tE+xRDxeP0GDkiJm/zJXRNLNVq87i/hO9MfE8N8YOJH6khfurg5wf4f/HV5l+X8JuJ39P6PP7o4M8apf1X5X8g4Xjr0\001:\037ref_Coder_FastPrimitivesCoder_4" } inputs { key: "0" value: "ref_PCollection_PCollection_3_split" } outputs { key: "None" value: "ref_PCollection_PCollection_4" } unique_name: "SDF/Process" environment_id: "ref_Environment_default_environment_1" ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 387774) Time Spent: 1h 50m (was: 1h 40m) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 1h 50m > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386983=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386983 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 14/Feb/20 00:18 Start Date: 14/Feb/20 00:18 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on pull request #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#discussion_r379191540 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -907,6 +961,12 @@ def get_buffer(buffer_id): if kind in ('materialize', 'timers'): # If `buffer_id` is not a key in `pcoll_buffers`, it will be added by # the `defaultdict`. +if buffer_id not in pcoll_buffers: + coder_id = beam_fn_api_pb2.RemoteGrpcPort.FromString( + process_bundle_descriptor.transforms[transform_id].spec.payload + ).coder_id + coder = context.coders[coder_id] Review comment: I have checked code for safe_coders, but still don't have clear ideas about it. What is the purpose of it? How do we handle for some coder_ids not part of safe_coders? Previously, I tried using safe_coders if the coder_id is in safe_coder, else return `coders[coder_id]`. This also pass tests, is it an option here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386983) Time Spent: 1h 40m (was: 1.5h) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 1h 40m > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > -- > This is caused by [this > PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60]. > A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is > rolling back iobase.py not to use
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386809=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386809 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 13/Feb/20 18:44 Start Date: 13/Feb/20 18:44 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#discussion_r379048737 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -937,6 +997,12 @@ def get_input_coder_impl(transform_id): process_bundle_descriptor.transforms[transform_id].spec.payload). coder_id]].get_impl() +def get_input_coder(transform_id): + coder_id = beam_fn_api_pb2.RemoteGrpcPort.FromString( + process_bundle_descriptor.transforms[transform_id].spec.payload + ).coder_id + return context.coders[coder_id] Review comment: This might fix your SDF issue as well. But if you use get_input_coder_impl everywhere, that might also fix things too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386809) Time Spent: 1.5h (was: 1h 20m) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 1.5h > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > -- > This is caused by [this > PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60]. > A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is > rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed > that data is distributed to multiple workers, however, there are some > regressions with SDF wrapper tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386797=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386797 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 13/Feb/20 18:37 Start Date: 13/Feb/20 18:37 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#discussion_r379039482 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -285,11 +285,50 @@ def partition(self, n): pass -class _ListBuffer(List[bytes]): +class _ListBuffer(): """Used to support parititioning of a list.""" + def __init__(self, input_coder): +self._input_coder = input_coder +self._inputs = [] +self._grouped_output = None +self.cleared = False + + def append(self, element): +if self._grouped_output: + raise RuntimeError('ListBuffer append after read.') +self._inputs.append(element) + def partition(self, n): # type: (int) -> List[List[bytes]] -return [self[k::n] for k in range(n)] +if len(self._inputs) >= n or len(self._inputs) == 0: + return [self._inputs[k::n] for k in range(n)] +else: + if not self._grouped_output: +self._grouped_output = [[] for _ in range(n)] +coder_impl = self._input_coder.get_impl() +decoded_input = [] +output_stream_list = [] +for _ in range(n): + output_stream_list.append(create_OutputStream()) +for input in self._inputs: + input_stream = create_InputStream(input) + while input_stream.size() > 0: +decoded_value = coder_impl.decode_from_stream(input_stream, True) +decoded_input.append(decoded_value) +for idx, v in enumerate(decoded_input): + coder_impl.encode_to_stream(v, output_stream_list[idx % n], True) +for ix, output_stream in enumerate(output_stream_list): + self._grouped_output[ix] = [output_stream.get()] +decoded_input = None + return self._grouped_output + + def __iter__(self): +return itertools.chain(*self.partition(1)) Review comment: Can you simply return `iter(self._inputs)`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386797) Time Spent: 1h 10m (was: 1h) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 1h 10m > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386800=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386800 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 13/Feb/20 18:37 Start Date: 13/Feb/20 18:37 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#discussion_r379042596 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -728,28 +769,33 @@ def _collect_written_timers_and_add_to_deferred_inputs( for windowed_key_timer in timers_by_key_and_window.values(): windowed_timer_coder_impl.encode_to_stream( windowed_key_timer, out, True) -deferred_inputs[transform_id] = _ListBuffer([out.get()]) -written_timers[:] = [] +deferred_inputs[transform_id] = _ListBuffer(input_coder=coder) +deferred_inputs[transform_id].append(out.get()) +written_timers.clear() def _add_residuals_and_channel_splits_to_deferred_inputs( self, splits, # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse] - get_input_coder_callable, + get_input_coder_impl_callable, input_for_callable, last_sent, - deferred_inputs # type: DefaultDict[str, PartitionableBuffer] + deferred_inputs, # type: DefaultDict[str, PartitionableBuffer] + get_input_coder_callable Review comment: This is redundant with get_input_coder_impl_callable--either only take the coder-giving one (and call get_impl yourself) or let _ListBuffer take a coder_impl in its construction (which may simplify things elsewhere). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386800) Time Spent: 1h 20m (was: 1h 10m) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 1h 20m > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? >
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386802=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386802 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 13/Feb/20 18:37 Start Date: 13/Feb/20 18:37 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#discussion_r379043647 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -907,6 +961,12 @@ def get_buffer(buffer_id): if kind in ('materialize', 'timers'): # If `buffer_id` is not a key in `pcoll_buffers`, it will be added by # the `defaultdict`. +if buffer_id not in pcoll_buffers: + coder_id = beam_fn_api_pb2.RemoteGrpcPort.FromString( + process_bundle_descriptor.transforms[transform_id].spec.payload + ).coder_id + coder = context.coders[coder_id] Review comment: Do we have to worry about using safe coders here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386802) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 1h 20m > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > -- > This is caused by [this > PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60]. > A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is > rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed > that data is distributed to multiple workers, however, there are some > regressions with SDF wrapper tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386796=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386796 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 13/Feb/20 18:37 Start Date: 13/Feb/20 18:37 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#discussion_r379039060 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -285,11 +285,50 @@ def partition(self, n): pass -class _ListBuffer(List[bytes]): +class _ListBuffer(): """Used to support parititioning of a list.""" + def __init__(self, input_coder): +self._input_coder = input_coder +self._inputs = [] +self._grouped_output = None +self.cleared = False + + def append(self, element): +if self._grouped_output: + raise RuntimeError('ListBuffer append after read.') +self._inputs.append(element) + def partition(self, n): # type: (int) -> List[List[bytes]] -return [self[k::n] for k in range(n)] +if len(self._inputs) >= n or len(self._inputs) == 0: + return [self._inputs[k::n] for k in range(n)] +else: + if not self._grouped_output: +self._grouped_output = [[] for _ in range(n)] +coder_impl = self._input_coder.get_impl() +decoded_input = [] +output_stream_list = [] +for _ in range(n): + output_stream_list.append(create_OutputStream()) +for input in self._inputs: + input_stream = create_InputStream(input) + while input_stream.size() > 0: +decoded_value = coder_impl.decode_from_stream(input_stream, True) +decoded_input.append(decoded_value) Review comment: Rather than creating an in-memory list of all decoded inputs, just append decoded_value to one of the output streams here (incrementing a counter to round-robin). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386796) Time Spent: 1h (was: 50m) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 1h > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386801=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386801 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 13/Feb/20 18:37 Start Date: 13/Feb/20 18:37 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#discussion_r379044005 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -937,6 +997,12 @@ def get_input_coder_impl(transform_id): process_bundle_descriptor.transforms[transform_id].spec.payload). coder_id]].get_impl() +def get_input_coder(transform_id): + coder_id = beam_fn_api_pb2.RemoteGrpcPort.FromString( + process_bundle_descriptor.transforms[transform_id].spec.payload + ).coder_id + return context.coders[coder_id] Review comment: safe_coders? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386801) Time Spent: 1h 20m (was: 1h 10m) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 1h 20m > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > -- > This is caused by [this > PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60]. > A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is > rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed > that data is distributed to multiple workers, however, there are some > regressions with SDF wrapper tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386799=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386799 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 13/Feb/20 18:37 Start Date: 13/Feb/20 18:37 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#discussion_r379044633 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -973,28 +1039,38 @@ def input_for(transform_id, input_id): last_sent = data_input while True: - deferred_inputs = collections.defaultdict( - _ListBuffer) # type: DefaultDict[str, PartitionableBuffer] + deferred_inputs = {} self._collect_written_timers_and_add_to_deferred_inputs( context, pipeline_components, stage, get_buffer, deferred_inputs) # Queue any process-initiated delayed bundle applications. for delayed_application in last_result.process_bundle.residual_roots: -deferred_inputs[input_for( +name = input_for( delayed_application.application.transform_id, -delayed_application.application.input_id)].append( -delayed_application.application.element) - +delayed_application.application.input_id) +if name not in deferred_inputs: + input_pcoll = process_bundle_descriptor.transforms[ + delayed_application.application.transform_id].inputs[delayed_application.application.input_id] + coder = context.coders[safe_coders[ +pipeline_components.pcollections[input_pcoll].coder_id]] + deferred_inputs[name] = _ListBuffer(input_coder=coder) +deferred_inputs[name].append(delayed_application.application.element) # Queue any runner-initiated delayed bundle applications. self._add_residuals_and_channel_splits_to_deferred_inputs( - splits, get_input_coder_impl, input_for, last_sent, deferred_inputs) + splits, get_input_coder_impl, input_for, last_sent, deferred_inputs, + get_input_coder) if deferred_inputs: # The worker will be waiting on these inputs as well. for other_input in data_input: if other_input not in deferred_inputs: -deferred_inputs[other_input] = _ListBuffer([]) +outputs = process_bundle_descriptor.transforms[ Review comment: This pattern is repeated a lot, factor out into a method? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386799) Time Spent: 1h 20m (was: 1h 10m) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 1h 20m > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) >
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386798=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386798 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 13/Feb/20 18:37 Start Date: 13/Feb/20 18:37 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#discussion_r379040830 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -683,7 +723,7 @@ def _run_bundle_multiple_times_for_testing( worker_handler.state.checkpoint() testing_bundle_manager = ParallelBundleManager( worker_handler_list, -lambda pcoll_id: _ListBuffer(), +lambda pcoll_id, transform_id: _ListBuffer(input_coder=None), Review comment: How is a None input coder safe? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386798) Time Spent: 1h 20m (was: 1h 10m) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 1h 20m > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > -- > This is caused by [this > PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60]. > A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is > rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed > that data is distributed to multiple workers, however, there are some > regressions with SDF wrapper tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386795=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386795 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 13/Feb/20 18:37 Start Date: 13/Feb/20 18:37 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#discussion_r379038022 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -285,11 +285,50 @@ def partition(self, n): pass -class _ListBuffer(List[bytes]): +class _ListBuffer(): """Used to support parititioning of a list.""" + def __init__(self, input_coder): +self._input_coder = input_coder +self._inputs = [] +self._grouped_output = None +self.cleared = False + + def append(self, element): +if self._grouped_output: + raise RuntimeError('ListBuffer append after read.') +self._inputs.append(element) + def partition(self, n): # type: (int) -> List[List[bytes]] -return [self[k::n] for k in range(n)] +if len(self._inputs) >= n or len(self._inputs) == 0: + return [self._inputs[k::n] for k in range(n)] +else: + if not self._grouped_output: +self._grouped_output = [[] for _ in range(n)] +coder_impl = self._input_coder.get_impl() +decoded_input = [] +output_stream_list = [] +for _ in range(n): + output_stream_list.append(create_OutputStream()) +for input in self._inputs: + input_stream = create_InputStream(input) + while input_stream.size() > 0: +decoded_value = coder_impl.decode_from_stream(input_stream, True) +decoded_input.append(decoded_value) +for idx, v in enumerate(decoded_input): + coder_impl.encode_to_stream(v, output_stream_list[idx % n], True) +for ix, output_stream in enumerate(output_stream_list): Review comment: You can replace this for loop with `self._grouped_output = [output_stream.get() for output_stream in output_stream_list]` and omit the assignment above. Similarly, above, you can write `output_stream_list = [create_OutputStream() for _ in range(n)]`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386795) Time Spent: 1h (was: 50m) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0, 2.19.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Fix For: 2.20.0 > > Time Spent: 1h > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16;
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386765=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386765 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 13/Feb/20 17:23 Start Date: 13/Feb/20 17:23 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on issue #10847: [BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#issuecomment-585873015 errors: https://builds.apache.org/job/beam_PreCommit_Python_Commit/11130/testReport/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386765) Time Spent: 50m (was: 40m) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > -- > This is caused by [this > PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60]. > A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is > rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed > that data is distributed to multiple workers, however, there are some > regressions with SDF wrapper tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386396=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386396 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 13/Feb/20 05:29 Start Date: 13/Feb/20 05:29 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on issue #10847: [WIP][BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#issuecomment-585558358 R: @robertwb Current PR passes all tests, except SDF ones. Can you please take a look to see I am on the correct track before I spend more time on investigation? Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386396) Time Spent: 40m (was: 0.5h) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > -- > This is caused by [this > PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60]. > A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is > rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed > that data is distributed to multiple workers, however, there are some > regressions with SDF wrapper tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386300=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386300 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 12/Feb/20 23:19 Start Date: 12/Feb/20 23:19 Worklog Time Spent: 10m Work Description: yifanzou commented on issue #10847: [WIP][BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#issuecomment-585467369 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386300) Time Spent: 0.5h (was: 20m) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > -- > This is caused by [this > PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60]. > A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is > rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed > that data is distributed to multiple workers, however, there are some > regressions with SDF wrapper tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386297=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386297 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 12/Feb/20 23:09 Start Date: 12/Feb/20 23:09 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on issue #10847: [WIP][BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847#issuecomment-585464493 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 386297) Time Spent: 20m (was: 10m) > _SDFBoundedSourceWrapper doesn't distribute data to multiple workers > > > Key: BEAM-9228 > URL: https://issues.apache.org/jira/browse/BEAM-9228 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.16.0, 2.18.0 >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > A user reported following issue. > - > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my system. > I am able to reproduce the issue with the minimal snippet below > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > p = beam.Pipeline(runner=runner, options=pipeline_options) > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > p.run() > {code} > Only one combination of apache_beam revision / worker type seems to work (I > refer to https://beam.apache.org/documentation/runners/direct/ for the worker > types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on > multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails > when trying to serialize the Environment instance most likely because of a > change from 2.17 to 2.18. > I also tried briefly SparkRunner with version 2.16 but was no able to achieve > any throughput. > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > -- > This is caused by [this > PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60]. > A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is > rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed > that data is distributed to multiple workers, however, there are some > regressions with SDF wrapper tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
[ https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386296=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386296 ] ASF GitHub Bot logged work on BEAM-9228: Author: ASF GitHub Bot Created on: 12/Feb/20 23:09 Start Date: 12/Feb/20 23:09 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on pull request #10847: [WIP][BEAM-9228] Support further partition for FnApi ListBuffer URL: https://github.com/apache/beam/pull/10847 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build