[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=392212=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-392212
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 25/Feb/20 00:28
Start Date: 25/Feb/20 00:28
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10847: [BEAM-9228] 
Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 392212)
Time Spent: 3h 20m  (was: 3h 10m)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> --
> This is caused by [this 
> PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is 
> rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed 
> that data is distributed to multiple workers, however, there are some 
> regressions with SDF wrapper tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=392208=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-392208
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 25/Feb/20 00:07
Start Date: 25/Feb/20 00:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10847: [BEAM-9228] 
Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#discussion_r383587744
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1603,23 +1603,6 @@ def restriction_size(self, element, restriction):
 return restriction.size()
 
 
-class FnApiRunnerSplitTestWithMultiWorkers(FnApiRunnerSplitTest):
 
 Review comment:
   Is there any loss of coverage in removing these tests? I can see how perhaps 
they're too strict. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 392208)
Time Spent: 3h 10m  (was: 3h)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> --
> This is caused by [this 
> PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is 
> rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed 
> that data is distributed to multiple workers, however, there are some 
> regressions with SDF wrapper tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=392207=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-392207
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 25/Feb/20 00:07
Start Date: 25/Feb/20 00:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10847: [BEAM-9228] 
Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#discussion_r383587172
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -932,10 +994,14 @@ def get_buffer(buffer_id):
   return pcoll_buffers[buffer_id]
 
 def get_input_coder_impl(transform_id):
-  return context.coders[
-  safe_coders[beam_fn_api_pb2.RemoteGrpcPort.FromString(
-  process_bundle_descriptor.transforms[transform_id].spec.payload).
-  coder_id]].get_impl()
+  coder_id = beam_fn_api_pb2.RemoteGrpcPort.FromString(
+  process_bundle_descriptor.transforms[transform_id].spec.payload
+  ).coder_id
+  assert coder_id
+  if coder_id in safe_coders:
 
 Review comment:
   An alternative would be `safe_coders.get(coder_id, coder_id)` which means 
look up the key (first argument) and if it doesn't exist return the second 
argument. This would eliminate some of the repetition of logic between the 
lines as well. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 392207)
Time Spent: 3h 10m  (was: 3h)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> --
> This is caused by [this 
> PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is 
> rolling back iobase.py not 

[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=392059=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-392059
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 24/Feb/20 20:10
Start Date: 24/Feb/20 20:10
Worklog Time Spent: 10m 
  Work Description: Hannah-Jiang commented on issue #10847: [BEAM-9228] 
Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#issuecomment-590525757
 
 
   kindly ping. @robertwb 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 392059)
Time Spent: 3h  (was: 2h 50m)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> --
> This is caused by [this 
> PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is 
> rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed 
> that data is distributed to multiple workers, however, there are some 
> regressions with SDF wrapper tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=390983=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-390983
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 22/Feb/20 00:11
Start Date: 22/Feb/20 00:11
Worklog Time Spent: 10m 
  Work Description: Hannah-Jiang commented on pull request #10847: 
[BEAM-9228] Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#discussion_r381568266
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -994,7 +1069,13 @@ def input_for(transform_id, input_id):
 # The worker will be waiting on these inputs as well.
 for other_input in data_input:
   if other_input not in deferred_inputs:
-deferred_inputs[other_input] = _ListBuffer([])
+outputs = process_bundle_descriptor.transforms[
+  other_input].outputs.values()
+coder_id = process_bundle_descriptor.pcollections[
+  only_element(outputs)].coder_id
+coder = context.coders[coder_id]
+deferred_inputs[other_input] = _ListBuffer(
+coder_impl=coder.get_impl())
 
 Review comment:
   As commented at L1082 (of the PR branch), deferred inputs cannot be parallel 
processed for now. Is it better to set coder_impl to None to reduce unnecessary 
processes for now and add it back later when parallel processing is supported 
for deferred_inputs?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 390983)
Time Spent: 2h 50m  (was: 2h 40m)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> --
> This is caused by [this 
> PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A 

[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=390308=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-390308
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 20/Feb/20 23:11
Start Date: 20/Feb/20 23:11
Worklog Time Spent: 10m 
  Work Description: Hannah-Jiang commented on issue #10847: [BEAM-9228] 
Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#issuecomment-589401291
 
 
   @robertwb, please take a look.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 390308)
Time Spent: 2h 40m  (was: 2.5h)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> --
> This is caused by [this 
> PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is 
> rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed 
> that data is distributed to multiple workers, however, there are some 
> regressions with SDF wrapper tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=390261=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-390261
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 20/Feb/20 20:54
Start Date: 20/Feb/20 20:54
Worklog Time Spent: 10m 
  Work Description: Hannah-Jiang commented on issue #10847: [BEAM-9228] 
Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#issuecomment-589303274
 
 
   retest it please
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 390261)
Time Spent: 2.5h  (was: 2h 20m)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> --
> This is caused by [this 
> PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is 
> rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed 
> that data is distributed to multiple workers, however, there are some 
> regressions with SDF wrapper tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=389654=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-389654
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 19/Feb/20 21:55
Start Date: 19/Feb/20 21:55
Worklog Time Spent: 10m 
  Work Description: Hannah-Jiang commented on pull request #10847: 
[BEAM-9228] Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#discussion_r381568266
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -994,7 +1069,13 @@ def input_for(transform_id, input_id):
 # The worker will be waiting on these inputs as well.
 for other_input in data_input:
   if other_input not in deferred_inputs:
-deferred_inputs[other_input] = _ListBuffer([])
+outputs = process_bundle_descriptor.transforms[
+  other_input].outputs.values()
+coder_id = process_bundle_descriptor.pcollections[
+  only_element(outputs)].coder_id
+coder = context.coders[coder_id]
+deferred_inputs[other_input] = _ListBuffer(
+coder_impl=coder.get_impl())
 
 Review comment:
   As commented at L1082 (of the PR branch), deferred inputs cannot be parallel 
processed for now. Is it better to set coder_impl to None to reduce unnecessary 
processes for now and add it back later when parallel processing is supported 
for deferred_inputs?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 389654)
Time Spent: 2h 20m  (was: 2h 10m)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> --
> This is caused by [this 
> PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A 

[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=389236=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-389236
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 19/Feb/20 00:49
Start Date: 19/Feb/20 00:49
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10847: [BEAM-9228] 
Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#discussion_r381019006
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -932,10 +998,14 @@ def get_buffer(buffer_id):
   return pcoll_buffers[buffer_id]
 
 def get_input_coder_impl(transform_id):
-  return context.coders[
-  safe_coders[beam_fn_api_pb2.RemoteGrpcPort.FromString(
-  process_bundle_descriptor.transforms[transform_id].spec.payload).
-  coder_id]].get_impl()
+  coder_id = beam_fn_api_pb2.RemoteGrpcPort.FromString(
+  process_bundle_descriptor.transforms[transform_id].spec.payload).\
+coder_id
+  assert coder_id is not None and coder_id != ''
 
 Review comment:
   OK, in the SDF case we can get the coder of the preceding transform (the one 
that produces its input `ref_PCollection_PCollection_3_split`) which should 
always be a RemoteGrpcPort. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 389236)
Time Spent: 2h 10m  (was: 2h)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> --
> This is caused by [this 
> PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is 
> rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed 
> that data 

[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=389233=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-389233
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 19/Feb/20 00:49
Start Date: 19/Feb/20 00:49
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10847: [BEAM-9228] 
Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#discussion_r381016780
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -287,43 +287,48 @@ def partition(self, n):
 
 class _ListBuffer():
   """Used to support parititioning of a list."""
-  def __init__(self, input_coder):
-self._input_coder = input_coder
+  def __init__(self, coder_impl):
+self._coder_impl = coder_impl
 self._inputs = []
 self._grouped_output = None
 self.cleared = False
 
   def append(self, element):
+if self.cleared:
+  raise RuntimeError('Trying to append to a cleared ListBuffer.')
 if self._grouped_output:
   raise RuntimeError('ListBuffer append after read.')
 self._inputs.append(element)
 
   def partition(self, n):
 # type: (int) -> List[List[bytes]]
+if self.cleared:
+  raise RuntimeError('Trying to partition a cleared ListBuffer.')
 if len(self._inputs) >= n or len(self._inputs) == 0:
   return [self._inputs[k::n] for k in range(n)]
 else:
   if not self._grouped_output:
-self._grouped_output = [[] for _ in range(n)]
-coder_impl = self._input_coder.get_impl()
-decoded_input = []
-output_stream_list = []
-for _ in range(n):
-  output_stream_list.append(create_OutputStream())
+output_stream_list = [create_OutputStream() for _ in range(n)]
+self._grouped_output = [output_stream.get() for output_stream
 
 Review comment:
   I meant you could put this instead of the loop at line 324.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 389233)
Time Spent: 2h  (was: 1h 50m)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I 

[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=389235=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-389235
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 19/Feb/20 00:49
Start Date: 19/Feb/20 00:49
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10847: [BEAM-9228] 
Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#discussion_r381018471
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -932,10 +998,14 @@ def get_buffer(buffer_id):
   return pcoll_buffers[buffer_id]
 
 def get_input_coder_impl(transform_id):
-  return context.coders[
-  safe_coders[beam_fn_api_pb2.RemoteGrpcPort.FromString(
-  process_bundle_descriptor.transforms[transform_id].spec.payload).
-  coder_id]].get_impl()
+  coder_id = beam_fn_api_pb2.RemoteGrpcPort.FromString(
+  process_bundle_descriptor.transforms[transform_id].spec.payload).\
 
 Review comment:
   Lint: don't use backslashes for continuation. (These days you can just run 
yapf to format your code, see 
https://cwiki.apache.org/confluence/display/BEAM/Python+Tips.)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 389235)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> --
> This is caused by [this 
> PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is 
> rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed 
> that data is distributed to multiple workers, however, there are some 
> regressions with SDF wrapper tests.



--
This message was 

[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=389234=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-389234
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 19/Feb/20 00:49
Start Date: 19/Feb/20 00:49
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10847: [BEAM-9228] 
Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#discussion_r381017895
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -769,7 +774,7 @@ def _collect_written_timers_and_add_to_deferred_inputs(
 for windowed_key_timer in timers_by_key_and_window.values():
   windowed_timer_coder_impl.encode_to_stream(
   windowed_key_timer, out, True)
-deferred_inputs[transform_id] = _ListBuffer(input_coder=coder)
+deferred_inputs[transform_id] = 
_ListBuffer(coder_impl=coder.get_impl())
 
 Review comment:
   You can now revert the changes up at the top of the loop. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 389234)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> --
> This is caused by [this 
> PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is 
> rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed 
> that data is distributed to multiple workers, however, there are some 
> regressions with SDF wrapper tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=387774=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-387774
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 15/Feb/20 00:59
Start Date: 15/Feb/20 00:59
Worklog Time Spent: 10m 
  Work Description: Hannah-Jiang commented on pull request #10847: 
[BEAM-9228] Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#discussion_r379704037
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -932,10 +998,14 @@ def get_buffer(buffer_id):
   return pcoll_buffers[buffer_id]
 
 def get_input_coder_impl(transform_id):
-  return context.coders[
-  safe_coders[beam_fn_api_pb2.RemoteGrpcPort.FromString(
-  process_bundle_descriptor.transforms[transform_id].spec.payload).
-  coder_id]].get_impl()
+  coder_id = beam_fn_api_pb2.RemoteGrpcPort.FromString(
+  process_bundle_descriptor.transforms[transform_id].spec.payload).\
+coder_id
+  assert coder_id is not None and coder_id != ''
 
 Review comment:
   `coder_id` can be retrieved from transform payload most cases, except one 
following case. It returns `''` coder_id. 
   This cause `test_checkpoint_sdf` test fails. How do we need to handle this 
case? Other SDF tests should be fixed now.
   
   ```
   spec {
 urn: "beam:transform:sdf_process_sized_element_and_restrictions:v1"
 payload: "\n\215\014\n 
beam:dofn:pickled_python_info:v1\032\350\013eNp9ldt31FQUxnNmBihpKSCorRdE8TLVOlBaLkXFy0AFI0NNEaJSYiY50wQyyXzJCVKcEVAzxfv9vf0XXcu1fPDBfc5MVy9QXmbNuezz/fa39zm5Wyy7XhCGFVv+6rabcEdwWyy2uA5t41oYO15/hVklTdPkfxRyFE2UrCGaOBtlTZ7QAXNeA9tcp+W4Prfr3GlWROJEaSNOmmnFjROun4lnIh3bc+zoYKBs6RRt283Yy0Ju29hpDfQmnCCioW7toGEriV2ephh8GHEji1wRxHTmUNlnKrwae/ySRNyVY9jE7rJRNDRjm1EyJqszC1qn2C4sayvaddYp3Rqn/4V2aYUlp9rF5WKbtUuNwgpj2n1tubSiMa1d8tgJrVPCnitau3iZZjxtTsOemsG62Fu29pGgmyUJj4Sd8FQkgcLBY9Y2WkmFkwjss3ZK15JF2w2doIn91nYaO60Wjzw8rhaDiPJp0iF4QuDJsrI55WEDI8oCHvYWR5XeOh2b/HVv8ARPqX0itnkzEHjaKva48IzAs0ou9RqV1iIO+IMXNFYd1kolprMhNswG2FABz1m712RsN84iQYcezPH8mMALJg65tl3PglDIuuirBdL9wUcW2zbXSGXhZ53Eaep4ceweXurg5bK1f1M2VOqbgUfKr/ha2S9Ze9c31+zqYvmRous0VyN0jOV4tYPXyv5OX1ceBlEgAifcULNxn3qI2qVA7VKgdpmrDgltmXmszVYKVPTXa4bWReUB6n4NUhy2BmnpYqORcmE60QLHkS4m/BF/tIuj/gF/3Pi7WtAYJsfI1SkTx7ZwdbxW6+B4TeCEiZPWsCxl/372qz29gbRIpLokJU5GnKdqOd7wD1sjazBrtJf6R7zZxVv+iAJe78HpLt4m0mnjH0X6jiR918R7W5BOS9IqkZ4xcbbX8q2QGnCmDyhvXtFgBLi9XViQcO8T3Lkx6Yl/egnnSWvG+Je0CvhAahkmPtxCa0ZqXSCtmomL1p5NRUiD2xyzG2QLJDvQu+wk/BEJm72LJbfO5bi0SvExUcwa/6mML0uKKyasLShmJcUnRPGpic/UpbNtL6bNuFqzdqlhGsYicpo8pcn5eVzLBGwTn1PXO+qBaslrYAce6tYhGq5vWLlSfqDjx+Bm9Rzewx5Al9MAvJ84o8QZJV6tDgitrY3KxBuU+II1KtFagXsj5J696aKn8HMEyhvJjes5bqx/NMJedBb14jeHo3nhGlPmRdK82ERrC/Oa0jyQeYmJ1DpIh04frfOTE8ecqfqkNzVVn6jzqSMnjx6ZPO5OT7j82AREjszEzRxfmLiVY7GD2xT/pYm2f7Xmk70d1cPqA2X7QSRSfPUwn2RinshaIX3G7qiQ8xflV+KcCrmrZoKolQl1Uop76ssWZ2Jt6mvlSZwEC0GEb5bwrcpgw1tE+xRDxeP0GDkiJm/zJXRNLNVq87i/hO9MfE8N8YOJH6khfurg5wf4f/HV5l+X8JuJ39P6PP7o4M8apf1X5X8g4Xjr0\001:\037ref_Coder_FastPrimitivesCoder_4"
   }
   inputs {
 key: "0"
 value: "ref_PCollection_PCollection_3_split"
   }
   outputs {
 key: "None"
 value: "ref_PCollection_PCollection_4"
   }
   unique_name: "SDF/Process"
   environment_id: "ref_Environment_default_environment_1"
   ```
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 387774)
Time Spent: 1h 50m  (was: 1h 40m)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import 

[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386983=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386983
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 14/Feb/20 00:18
Start Date: 14/Feb/20 00:18
Worklog Time Spent: 10m 
  Work Description: Hannah-Jiang commented on pull request #10847: 
[BEAM-9228] Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#discussion_r379191540
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -907,6 +961,12 @@ def get_buffer(buffer_id):
   if kind in ('materialize', 'timers'):
 # If `buffer_id` is not a key in `pcoll_buffers`, it will be added by
 # the `defaultdict`.
+if buffer_id not in pcoll_buffers:
+  coder_id = beam_fn_api_pb2.RemoteGrpcPort.FromString(
+  process_bundle_descriptor.transforms[transform_id].spec.payload
+  ).coder_id
+  coder = context.coders[coder_id]
 
 Review comment:
   I have checked code for safe_coders, but still don't have clear ideas about 
it. What is the purpose of it? How do we handle for some coder_ids not part of 
safe_coders? Previously, I tried using safe_coders if the coder_id is in 
safe_coder, else return `coders[coder_id]`. This also pass tests, is it an 
option here?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 386983)
Time Spent: 1h 40m  (was: 1.5h)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> --
> This is caused by [this 
> PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is 
> rolling back iobase.py not to use 

[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386809=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386809
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 13/Feb/20 18:44
Start Date: 13/Feb/20 18:44
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10847: [BEAM-9228] 
Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#discussion_r379048737
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -937,6 +997,12 @@ def get_input_coder_impl(transform_id):
   process_bundle_descriptor.transforms[transform_id].spec.payload).
   coder_id]].get_impl()
 
+def get_input_coder(transform_id):
+  coder_id = beam_fn_api_pb2.RemoteGrpcPort.FromString(
+  process_bundle_descriptor.transforms[transform_id].spec.payload
+  ).coder_id
+  return context.coders[coder_id]
 
 Review comment:
   This might fix your SDF issue as well. But if you use get_input_coder_impl 
everywhere, that might also fix things too. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 386809)
Time Spent: 1.5h  (was: 1h 20m)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> --
> This is caused by [this 
> PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is 
> rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed 
> that data is distributed to multiple workers, however, there are some 
> regressions with SDF wrapper tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386797=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386797
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 13/Feb/20 18:37
Start Date: 13/Feb/20 18:37
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10847: [BEAM-9228] 
Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#discussion_r379039482
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -285,11 +285,50 @@ def partition(self, n):
 pass
 
 
-class _ListBuffer(List[bytes]):
+class _ListBuffer():
   """Used to support parititioning of a list."""
+  def __init__(self, input_coder):
+self._input_coder = input_coder
+self._inputs = []
+self._grouped_output = None
+self.cleared = False
+
+  def append(self, element):
+if self._grouped_output:
+  raise RuntimeError('ListBuffer append after read.')
+self._inputs.append(element)
+
   def partition(self, n):
 # type: (int) -> List[List[bytes]]
-return [self[k::n] for k in range(n)]
+if len(self._inputs) >= n or len(self._inputs) == 0:
+  return [self._inputs[k::n] for k in range(n)]
+else:
+  if not self._grouped_output:
+self._grouped_output = [[] for _ in range(n)]
+coder_impl = self._input_coder.get_impl()
+decoded_input = []
+output_stream_list = []
+for _ in range(n):
+  output_stream_list.append(create_OutputStream())
+for input in self._inputs:
+  input_stream = create_InputStream(input)
+  while input_stream.size() > 0:
+decoded_value = coder_impl.decode_from_stream(input_stream, True)
+decoded_input.append(decoded_value)
+for idx, v in enumerate(decoded_input):
+  coder_impl.encode_to_stream(v, output_stream_list[idx % n], True)
+for ix, output_stream in enumerate(output_stream_list):
+  self._grouped_output[ix] = [output_stream.get()]
+decoded_input = None
+  return self._grouped_output
+
+  def __iter__(self):
+return itertools.chain(*self.partition(1))
 
 Review comment:
   Can you simply return `iter(self._inputs)`?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 386797)
Time Spent: 1h 10m  (was: 1h)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither 

[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386800=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386800
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 13/Feb/20 18:37
Start Date: 13/Feb/20 18:37
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10847: [BEAM-9228] 
Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#discussion_r379042596
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -728,28 +769,33 @@ def _collect_written_timers_and_add_to_deferred_inputs(
 for windowed_key_timer in timers_by_key_and_window.values():
   windowed_timer_coder_impl.encode_to_stream(
   windowed_key_timer, out, True)
-deferred_inputs[transform_id] = _ListBuffer([out.get()])
-written_timers[:] = []
+deferred_inputs[transform_id] = _ListBuffer(input_coder=coder)
+deferred_inputs[transform_id].append(out.get())
+written_timers.clear()
 
   def _add_residuals_and_channel_splits_to_deferred_inputs(
   self,
   splits,  # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]
-  get_input_coder_callable,
+  get_input_coder_impl_callable,
   input_for_callable,
   last_sent,
-  deferred_inputs  # type: DefaultDict[str, PartitionableBuffer]
+  deferred_inputs,  # type: DefaultDict[str, PartitionableBuffer]
+  get_input_coder_callable
 
 Review comment:
   This is redundant with get_input_coder_impl_callable--either only take the 
coder-giving one (and call get_impl yourself) or let _ListBuffer take a 
coder_impl in its construction (which may simplify things elsewhere). 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 386800)
Time Spent: 1h 20m  (was: 1h 10m)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> 

[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386802=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386802
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 13/Feb/20 18:37
Start Date: 13/Feb/20 18:37
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10847: [BEAM-9228] 
Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#discussion_r379043647
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -907,6 +961,12 @@ def get_buffer(buffer_id):
   if kind in ('materialize', 'timers'):
 # If `buffer_id` is not a key in `pcoll_buffers`, it will be added by
 # the `defaultdict`.
+if buffer_id not in pcoll_buffers:
+  coder_id = beam_fn_api_pb2.RemoteGrpcPort.FromString(
+  process_bundle_descriptor.transforms[transform_id].spec.payload
+  ).coder_id
+  coder = context.coders[coder_id]
 
 Review comment:
   Do we have to worry about using safe coders here? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 386802)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> --
> This is caused by [this 
> PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is 
> rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed 
> that data is distributed to multiple workers, however, there are some 
> regressions with SDF wrapper tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386796=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386796
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 13/Feb/20 18:37
Start Date: 13/Feb/20 18:37
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10847: [BEAM-9228] 
Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#discussion_r379039060
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -285,11 +285,50 @@ def partition(self, n):
 pass
 
 
-class _ListBuffer(List[bytes]):
+class _ListBuffer():
   """Used to support parititioning of a list."""
+  def __init__(self, input_coder):
+self._input_coder = input_coder
+self._inputs = []
+self._grouped_output = None
+self.cleared = False
+
+  def append(self, element):
+if self._grouped_output:
+  raise RuntimeError('ListBuffer append after read.')
+self._inputs.append(element)
+
   def partition(self, n):
 # type: (int) -> List[List[bytes]]
-return [self[k::n] for k in range(n)]
+if len(self._inputs) >= n or len(self._inputs) == 0:
+  return [self._inputs[k::n] for k in range(n)]
+else:
+  if not self._grouped_output:
+self._grouped_output = [[] for _ in range(n)]
+coder_impl = self._input_coder.get_impl()
+decoded_input = []
+output_stream_list = []
+for _ in range(n):
+  output_stream_list.append(create_OutputStream())
+for input in self._inputs:
+  input_stream = create_InputStream(input)
+  while input_stream.size() > 0:
+decoded_value = coder_impl.decode_from_stream(input_stream, True)
+decoded_input.append(decoded_value)
 
 Review comment:
   Rather than creating an in-memory list of all decoded inputs, just append 
decoded_value to one of the output streams here (incrementing a counter to 
round-robin). 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 386796)
Time Spent: 1h  (was: 50m)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most 

[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386801=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386801
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 13/Feb/20 18:37
Start Date: 13/Feb/20 18:37
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10847: [BEAM-9228] 
Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#discussion_r379044005
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -937,6 +997,12 @@ def get_input_coder_impl(transform_id):
   process_bundle_descriptor.transforms[transform_id].spec.payload).
   coder_id]].get_impl()
 
+def get_input_coder(transform_id):
+  coder_id = beam_fn_api_pb2.RemoteGrpcPort.FromString(
+  process_bundle_descriptor.transforms[transform_id].spec.payload
+  ).coder_id
+  return context.coders[coder_id]
 
 Review comment:
   safe_coders?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 386801)
Time Spent: 1h 20m  (was: 1h 10m)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> --
> This is caused by [this 
> PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is 
> rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed 
> that data is distributed to multiple workers, however, there are some 
> regressions with SDF wrapper tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386799=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386799
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 13/Feb/20 18:37
Start Date: 13/Feb/20 18:37
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10847: [BEAM-9228] 
Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#discussion_r379044633
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -973,28 +1039,38 @@ def input_for(transform_id, input_id):
 last_sent = data_input
 
 while True:
-  deferred_inputs = collections.defaultdict(
-  _ListBuffer)  # type: DefaultDict[str, PartitionableBuffer]
+  deferred_inputs = {}
 
   self._collect_written_timers_and_add_to_deferred_inputs(
   context, pipeline_components, stage, get_buffer, deferred_inputs)
 
   # Queue any process-initiated delayed bundle applications.
   for delayed_application in last_result.process_bundle.residual_roots:
-deferred_inputs[input_for(
+name = input_for(
 delayed_application.application.transform_id,
-delayed_application.application.input_id)].append(
-delayed_application.application.element)
-
+delayed_application.application.input_id)
+if name not in deferred_inputs:
+  input_pcoll = process_bundle_descriptor.transforms[
+
delayed_application.application.transform_id].inputs[delayed_application.application.input_id]
+  coder = context.coders[safe_coders[
+pipeline_components.pcollections[input_pcoll].coder_id]]
+  deferred_inputs[name] = _ListBuffer(input_coder=coder)
+deferred_inputs[name].append(delayed_application.application.element)
   # Queue any runner-initiated delayed bundle applications.
   self._add_residuals_and_channel_splits_to_deferred_inputs(
-  splits, get_input_coder_impl, input_for, last_sent, deferred_inputs)
+  splits, get_input_coder_impl, input_for, last_sent, deferred_inputs,
+  get_input_coder)
 
   if deferred_inputs:
 # The worker will be waiting on these inputs as well.
 for other_input in data_input:
   if other_input not in deferred_inputs:
-deferred_inputs[other_input] = _ListBuffer([])
+outputs = process_bundle_descriptor.transforms[
 
 Review comment:
   This pattern is repeated a lot, factor out into a method?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 386799)
Time Spent: 1h 20m  (was: 1h 10m)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  

[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386798=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386798
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 13/Feb/20 18:37
Start Date: 13/Feb/20 18:37
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10847: [BEAM-9228] 
Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#discussion_r379040830
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -683,7 +723,7 @@ def _run_bundle_multiple_times_for_testing(
 worker_handler.state.checkpoint()
 testing_bundle_manager = ParallelBundleManager(
 worker_handler_list,
-lambda pcoll_id: _ListBuffer(),
+lambda pcoll_id, transform_id: _ListBuffer(input_coder=None),
 
 Review comment:
   How is a None input coder safe? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 386798)
Time Spent: 1h 20m  (was: 1h 10m)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> --
> This is caused by [this 
> PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is 
> rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed 
> that data is distributed to multiple workers, however, there are some 
> regressions with SDF wrapper tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386795=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386795
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 13/Feb/20 18:37
Start Date: 13/Feb/20 18:37
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10847: [BEAM-9228] 
Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#discussion_r379038022
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -285,11 +285,50 @@ def partition(self, n):
 pass
 
 
-class _ListBuffer(List[bytes]):
+class _ListBuffer():
   """Used to support parititioning of a list."""
+  def __init__(self, input_coder):
+self._input_coder = input_coder
+self._inputs = []
+self._grouped_output = None
+self.cleared = False
+
+  def append(self, element):
+if self._grouped_output:
+  raise RuntimeError('ListBuffer append after read.')
+self._inputs.append(element)
+
   def partition(self, n):
 # type: (int) -> List[List[bytes]]
-return [self[k::n] for k in range(n)]
+if len(self._inputs) >= n or len(self._inputs) == 0:
+  return [self._inputs[k::n] for k in range(n)]
+else:
+  if not self._grouped_output:
+self._grouped_output = [[] for _ in range(n)]
+coder_impl = self._input_coder.get_impl()
+decoded_input = []
+output_stream_list = []
+for _ in range(n):
+  output_stream_list.append(create_OutputStream())
+for input in self._inputs:
+  input_stream = create_InputStream(input)
+  while input_stream.size() > 0:
+decoded_value = coder_impl.decode_from_stream(input_stream, True)
+decoded_input.append(decoded_value)
+for idx, v in enumerate(decoded_input):
+  coder_impl.encode_to_stream(v, output_stream_list[idx % n], True)
+for ix, output_stream in enumerate(output_stream_list):
 
 Review comment:
   You can replace this for loop with `self._grouped_output = 
[output_stream.get() for output_stream in output_stream_list]` and omit the 
assignment above. Similarly, above, you can write `output_stream_list = 
[create_OutputStream() for _ in range(n)]`. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 386795)
Time Spent: 1h  (was: 50m)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0, 2.19.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; 

[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386765=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386765
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 13/Feb/20 17:23
Start Date: 13/Feb/20 17:23
Worklog Time Spent: 10m 
  Work Description: Hannah-Jiang commented on issue #10847: [BEAM-9228] 
Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#issuecomment-585873015
 
 
   errors: 
https://builds.apache.org/job/beam_PreCommit_Python_Commit/11130/testReport/
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 386765)
Time Spent: 50m  (was: 40m)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> --
> This is caused by [this 
> PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is 
> rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed 
> that data is distributed to multiple workers, however, there are some 
> regressions with SDF wrapper tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386396=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386396
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 13/Feb/20 05:29
Start Date: 13/Feb/20 05:29
Worklog Time Spent: 10m 
  Work Description: Hannah-Jiang commented on issue #10847: 
[WIP][BEAM-9228] Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#issuecomment-585558358
 
 
   R: @robertwb 
   Current PR passes all tests, except SDF ones. Can you please take a look to 
see I am on the correct track before I spend more time on investigation? Thank 
you.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 386396)
Time Spent: 40m  (was: 0.5h)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> --
> This is caused by [this 
> PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is 
> rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed 
> that data is distributed to multiple workers, however, there are some 
> regressions with SDF wrapper tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386300=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386300
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 12/Feb/20 23:19
Start Date: 12/Feb/20 23:19
Worklog Time Spent: 10m 
  Work Description: yifanzou commented on issue #10847: [WIP][BEAM-9228] 
Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#issuecomment-585467369
 
 
   retest this please
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 386300)
Time Spent: 0.5h  (was: 20m)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> --
> This is caused by [this 
> PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is 
> rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed 
> that data is distributed to multiple workers, however, there are some 
> regressions with SDF wrapper tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386297=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386297
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 12/Feb/20 23:09
Start Date: 12/Feb/20 23:09
Worklog Time Spent: 10m 
  Work Description: Hannah-Jiang commented on issue #10847: 
[WIP][BEAM-9228] Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847#issuecomment-585464493
 
 
   retest this please
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 386297)
Time Spent: 20m  (was: 10m)

> _SDFBoundedSourceWrapper doesn't distribute data to multiple workers
> 
>
> Key: BEAM-9228
> URL: https://issues.apache.org/jira/browse/BEAM-9228
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.16.0, 2.18.0
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> A user reported following issue.
> -
> I have a set of tfrecord files, obtained by converting parquet files with 
> Spark. Each file is roughly 1GB and I have 11 of those.
> I would expect simple statistics gathering (ie counting number of items of 
> all files) to scale linearly with respect to the number of cores on my system.
> I am able to reproduce the issue with the minimal snippet below
> {code:java}
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.runners.portability import fn_api_runner
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.portability import python_urns
> import sys
> pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
> file_pattern = 'part-r-00*
> runner=fn_api_runner.FnApiRunner(
>   default_environment=beam_runner_api_pb2.Environment(
>   urn=python_urns.SUBPROCESS_SDK,
>   payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
> % sys.executable.encode('ascii')))
> p = beam.Pipeline(runner=runner, options=pipeline_options)
> lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
>  | beam.combiners.Count.Globally()
>  | beam.io.WriteToText('/tmp/output'))
> p.run()
> {code}
> Only one combination of apache_beam revision / worker type seems to work (I 
> refer to https://beam.apache.org/documentation/runners/direct/ for the worker 
> types)
> * beam 2.16; neither multithread nor multiprocess achieve high cpu usage on 
> multiple cores
> * beam 2.17: able to achieve high cpu usage on all 4 cores
> * beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails 
> when trying to serialize the Environment instance most likely because of a 
> change from 2.17 to 2.18.
> I also tried briefly SparkRunner with version 2.16 but was no able to achieve 
> any throughput.
> What is the recommnended way to achieve what I am trying to ? How can I 
> troubleshoot ?
> --
> This is caused by [this 
> PR|https://github.com/apache/beam/commit/02f8ad4eee3ec0ea8cbdc0f99c1dad29f00a9f60].
> A [workaround|https://github.com/apache/beam/pull/10729] is tried, which is 
> rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed 
> that data is distributed to multiple workers, however, there are some 
> regressions with SDF wrapper tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9228) _SDFBoundedSourceWrapper doesn't distribute data to multiple workers

2020-02-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9228?focusedWorklogId=386296=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386296
 ]

ASF GitHub Bot logged work on BEAM-9228:


Author: ASF GitHub Bot
Created on: 12/Feb/20 23:09
Start Date: 12/Feb/20 23:09
Worklog Time Spent: 10m 
  Work Description: Hannah-Jiang commented on pull request #10847: 
[WIP][BEAM-9228] Support further partition for FnApi ListBuffer
URL: https://github.com/apache/beam/pull/10847
 
 
   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build