This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch users/damccorm/prism-revert in repository https://gitbox.apache.org/repos/asf/beam.git
commit 34fb5d54b2b4dec264d4ac0f5e64bbb068ae4b5e Author: Danny Mccormick <[email protected]> AuthorDate: Wed Jul 16 13:27:11 2025 -0400 Temporarily move back to fnapi runner as default --- .../apache_beam/runners/direct/direct_runner.py | 29 +++++++++++----------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 564a6c7df20..628be01145c 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -195,9 +195,23 @@ class SwitchingDirectRunner(PipelineRunner): # Use BundleBasedDirectRunner if other runners are missing needed features. runner = BundleBasedDirectRunner() + + + # Check whether all transforms used in the pipeline are supported by the + # FnApiRunner, and the pipeline was not meant to be run as streaming. + if _FnApiRunnerSupportVisitor().accept(pipeline): + from apache_beam.portability.api import beam_provision_api_pb2 + from apache_beam.runners.portability.fn_api_runner import fn_runner + from apache_beam.runners.portability.portable_runner import JobServiceHandle + all_options = options.get_all_options() + encoded_options = JobServiceHandle.encode_pipeline_options(all_options) + provision_info = fn_runner.ExtendedProvisionInfo( + beam_provision_api_pb2.ProvisionInfo( + pipeline_options=encoded_options)) + runner = fn_runner.FnApiRunner(provision_info=provision_info) # Check whether all transforms used in the pipeline are supported by the # PrismRunner - if _PrismRunnerSupportVisitor().accept(pipeline, self._is_interactive): + elif _PrismRunnerSupportVisitor().accept(pipeline, self._is_interactive): _LOGGER.info('Running pipeline with PrismRunner.') from apache_beam.runners.portability import prism_runner runner = prism_runner.PrismRunner() @@ -221,19 +235,6 @@ class SwitchingDirectRunner(PipelineRunner): _LOGGER.info('Falling back to DirectRunner') runner = BundleBasedDirectRunner() - # Check whether all transforms used in the pipeline are supported by the - # FnApiRunner, and the pipeline was not meant to be run as streaming. - if _FnApiRunnerSupportVisitor().accept(pipeline): - from apache_beam.portability.api import beam_provision_api_pb2 - from apache_beam.runners.portability.fn_api_runner import fn_runner - from apache_beam.runners.portability.portable_runner import JobServiceHandle - all_options = options.get_all_options() - encoded_options = JobServiceHandle.encode_pipeline_options(all_options) - provision_info = fn_runner.ExtendedProvisionInfo( - beam_provision_api_pb2.ProvisionInfo( - pipeline_options=encoded_options)) - runner = fn_runner.FnApiRunner(provision_info=provision_info) - return runner.run_pipeline(pipeline, options)
