This is an automated email from the ASF dual-hosted git repository. ibzib pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 36bf02c [BEAM-11504] Clean up direct runner parallelism section. new d72b5d9 Merge pull request #13589 from ibzib/BEAM-11504 36bf02c is described below commit 36bf02cfb2b37a894f6555c0038f06ba6a779e3a Author: Kyle Weaver <kcwea...@google.com> AuthorDate: Mon Dec 21 13:12:10 2020 -0800 [BEAM-11504] Clean up direct runner parallelism section. - Remove unparseable language tags. - Add corresponding Java instructions. - Remove obsolete (version < 2.19) Python instructions. - Defer to programming guide for general instructions for setting pipeline options. --- .../content/en/documentation/runners/direct.md | 112 +++++---------------- 1 file changed, 25 insertions(+), 87 deletions(-) diff --git a/website/www/site/content/en/documentation/runners/direct.md b/website/www/site/content/en/documentation/runners/direct.md index 0168dcc..1249aa9 100644 --- a/website/www/site/content/en/documentation/runners/direct.md +++ b/website/www/site/content/en/documentation/runners/direct.md @@ -57,6 +57,8 @@ Here are some resources with information about how to test your pipelines. ## Pipeline options for the Direct Runner +For general instructions on how to set pipeline options, see the [programming guide](/documentation/programming-guide/#configuring-pipeline-options). + When executing your pipeline from the command-line, set `runner` to `direct` or `DirectRunner`. The default values for the other pipeline options are generally sufficient. See the reference documentation for the @@ -74,105 +76,41 @@ Local execution is limited by the memory available in your local environment. It If your pipeline uses an unbounded data source or sink, you must set the `streaming` option to `true`. -{:.language-py} -### Execution Mode +### Parallel execution -{:.language-py} +{{< paragraph class="language-py" >}} Python [FnApiRunner](https://beam.apache.org/contribute/runner-guide/#the-fn-api) supports multi-threading and multi-processing mode. +{{< /paragraph >}} + +#### Setting parallelism -{:.language-py} -<strong>Setting parallelism</strong> +{{< paragraph class="language-java" >}} +The number of worker threads is defined by the `targetParallelism` pipeline option. +By default, `targetParallelism` is the greater of the number of available processors and 3. +{{< /paragraph >}} -{:.language-py} -Number of threads or subprocesses is defined by setting the `direct_num_workers` option. +{{< paragraph class="language-py" >}} +Number of threads or subprocesses is defined by setting the `direct_num_workers` pipeline option. From 2.22.0, `direct_num_workers = 0` is supported. When `direct_num_workers` is set to 0, it will set the number of threads/subprocess to the number of cores of the machine where the pipeline is running. +{{< /paragraph >}} -{:.language-py} -* There are several ways to set this option. -```py -python wordcount.py --input xx --output xx --direct_num_workers 2 -``` - -{:.language-py} -* Setting with `PipelineOptions`. -```py -from apache_beam.options.pipeline_options import PipelineOptions -pipeline_options = PipelineOptions(['--direct_num_workers', '2']) -``` - -{:.language-py} -* Adding to existing `PipelineOptions`. -```py -from apache_beam.options.pipeline_options import DirectOptions -pipeline_options = PipelineOptions(xxx) -pipeline_options.view_as(DirectOptions).direct_num_workers = 2 -``` - -{:.language-py} +{{< paragraph class="language-py" >}} <strong>Setting running mode</strong> +{{< /paragraph >}} -{:.language-py} -From 2.19, a new option was added to set running mode. We can use `direct_running_mode` option to set the running mode. +{{< paragraph class="language-py" >}} +In Beam 2.19.0 and newer, you can use the `direct_running_mode` pipeline option to set the running mode. `direct_running_mode` can be one of [`'in_memory'`, `'multi_threading'`, `'multi_processing'`]. +{{< /paragraph >}} -{:.language-py} +{{< paragraph class="language-py" >}} <b>in_memory</b>: Runner and workers' communication happens in memory (not through gRPC). This is a default mode. +{{< /paragraph >}} -{:.language-py} +{{< paragraph class="language-py" >}} <b>multi_threading</b>: Runner and workers communicate through gRPC and each worker runs in a thread. +{{< /paragraph >}} -{:.language-py} +{{< paragraph class="language-py" >}} <b>multi_processing</b>: Runner and workers communicate through gRPC and each worker runs in a subprocess. - -{:.language-py} -Same as other options, `direct_running_mode` can be passed through CLI or set with `PipelineOptions`. - -{:.language-py} -For the versions before 2.19.0, the running mode should be set with `FnApiRunner()`. Please refer following examples. - -{:.language-py} -#### Running with multi-threading mode -```py -import argparse - -import apache_beam as beam -from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.runners.portability import fn_api_runner -from apache_beam.portability.api import beam_runner_api_pb2 -from apache_beam.portability import python_urns - -parser = argparse.ArgumentParser() -parser.add_argument(...) -known_args, pipeline_args = parser.parse_known_args(argv) -pipeline_options = PipelineOptions(pipeline_args) - -p = beam.Pipeline(options=pipeline_options, - runner=fn_api_runner.FnApiRunner( - default_environment=beam_runner_api_pb2.Environment( - urn=python_urns.EMBEDDED_PYTHON_GRPC))) -``` - -{:.language-py} -#### Running with multi-processing mode -```py -import argparse -import sys - -import apache_beam as beam -from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.runners.portability import fn_api_runner -from apache_beam.portability.api import beam_runner_api_pb2 -from apache_beam.portability import python_urns - -parser = argparse.ArgumentParser() -parser.add_argument(...) -known_args, pipeline_args = parser.parse_known_args(argv) -pipeline_options = PipelineOptions(pipeline_args) - -p = beam.Pipeline(options=pipeline_options, - runner=fn_api_runner.FnApiRunner( - default_environment=beam_runner_api_pb2.Environment( - urn=python_urns.SUBPROCESS_SDK, - payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' - % sys.executable.encode('ascii')))) -``` +{{< /paragraph >}}