This is an automated email from the ASF dual-hosted git repository. ibzib pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new c679de6 [BEAM-9637] Add all runners to Python --runner help text. new 0350f2e Merge pull request #13584 from ibzib/BEAM-9637 c679de6 is described below commit c679de69d26de7472ab50fd4704cde24281205d7 Author: Kyle Weaver <kcwea...@google.com> AuthorDate: Fri Dec 18 12:41:23 2020 -0800 [BEAM-9637] Add all runners to Python --runner help text. --- .../python/apache_beam/options/pipeline_options.py | 19 ++++++++++++++++- sdks/python/apache_beam/runners/runner.py | 24 +++++++--------------- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index c169586..0effcf6 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -414,13 +414,30 @@ class StandardOptions(PipelineOptions): DEFAULT_RUNNER = 'DirectRunner' + ALL_KNOWN_RUNNERS = ( + 'apache_beam.runners.dataflow.dataflow_runner.DataflowRunner', + 'apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner', + 'apache_beam.runners.direct.direct_runner.DirectRunner', + 'apache_beam.runners.direct.direct_runner.SwitchingDirectRunner', + 'apache_beam.runners.interactive.interactive_runner.InteractiveRunner', + 'apache_beam.runners.portability.flink_runner.FlinkRunner', + 'apache_beam.runners.portability.portable_runner.PortableRunner', + 'apache_beam.runners.portability.spark_runner.SparkRunner', + 'apache_beam.runners.test.TestDirectRunner', + 'apache_beam.runners.test.TestDataflowRunner', + ) + + KNOWN_RUNNER_NAMES = [path.split('.')[-1] for path in ALL_KNOWN_RUNNERS] + @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--runner', help=( 'Pipeline runner used to execute the workflow. Valid values are ' - 'DirectRunner, DataflowRunner.')) + 'one of %s, or the fully qualified name of a PipelineRunner ' + 'subclass. If unspecified, defaults to %s.' % + (', '.join(cls.KNOWN_RUNNER_NAMES), cls.DEFAULT_RUNNER))) # Whether to enable streaming mode. parser.add_argument( '--streaming', diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index ba80b27..02ed845 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -31,6 +31,8 @@ from builtins import object from typing import TYPE_CHECKING from typing import Optional +from apache_beam.options.pipeline_options import StandardOptions + if TYPE_CHECKING: from apache_beam import pvalue from apache_beam import PTransform @@ -41,22 +43,10 @@ if TYPE_CHECKING: __all__ = ['PipelineRunner', 'PipelineState', 'PipelineResult'] -_ALL_KNOWN_RUNNERS = ( - 'apache_beam.runners.dataflow.dataflow_runner.DataflowRunner', - 'apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner', - 'apache_beam.runners.direct.direct_runner.DirectRunner', - 'apache_beam.runners.direct.direct_runner.SwitchingDirectRunner', - 'apache_beam.runners.interactive.interactive_runner.InteractiveRunner', - 'apache_beam.runners.portability.flink_runner.FlinkRunner', - 'apache_beam.runners.portability.portable_runner.PortableRunner', - 'apache_beam.runners.portability.spark_runner.SparkRunner', - 'apache_beam.runners.test.TestDirectRunner', - 'apache_beam.runners.test.TestDataflowRunner', -) - -_KNOWN_RUNNER_NAMES = [path.split('.')[-1] for path in _ALL_KNOWN_RUNNERS] - -_RUNNER_MAP = {path.split('.')[-1].lower(): path for path in _ALL_KNOWN_RUNNERS} +_RUNNER_MAP = { + path.split('.')[-1].lower(): path + for path in StandardOptions.ALL_KNOWN_RUNNERS +} # Allow this alias, but don't make public. _RUNNER_MAP['pythonrpcdirectrunner'] = ( @@ -110,7 +100,7 @@ def create_runner(runner_name): raise ValueError( 'Unexpected pipeline runner: %s. Valid values are %s ' 'or the fully qualified name of a PipelineRunner subclass.' % - (runner_name, ', '.join(_KNOWN_RUNNER_NAMES))) + (runner_name, ', '.join(StandardOptions.KNOWN_RUNNER_NAMES))) class PipelineRunner(object):