tvalentyn commented on code in PR #27512:
URL: https://github.com/apache/beam/pull/27512#discussion_r1267335936
##########
sdks/python/apache_beam/transforms/environments.py:
##########
@@ -699,6 +721,26 @@ def default(cls):
return cls(capabilities=python_sdk_capabilities(), artifacts=())
[email protected]_urn(python_urns.EMBEDDED_PYTHON_LOOPBACK, None)
+class PythonLoopbackEnvironment(EmbeddedPythonEnvironment):
+ """Used as a stub when the loopback worker has not yet been started."""
+ def to_runner_api_parameter(self, context):
Review Comment:
Should we add the typehint? I think it might be: ` # type: (PipelineContext)
-> typing.Tuple[str, message.Message]`
##########
sdks/python/apache_beam/runners/runner.py:
##########
@@ -158,181 +177,39 @@ def run_pipeline(
# type: (...) -> PipelineResult
"""Execute the entire pipeline or the sub-DAG reachable from a node.
-
- Runners should override this method.
"""
- raise NotImplementedError
+ pipeline.visit(
+ group_by_key_input_visitor(
+ not
options.view_as(TypeOptions).allow_non_deterministic_key_coders)
+ )
+
+ # TODO: https://github.com/apache/beam/issues/19168
+ # portable runner specific default
Review Comment:
we can and plan to make this a default for dataflow as well:
https://github.com/apache/beam/issues/26996
##########
sdks/python/apache_beam/runners/runner.py:
##########
@@ -158,181 +177,39 @@ def run_pipeline(
# type: (...) -> PipelineResult
"""Execute the entire pipeline or the sub-DAG reachable from a node.
-
- Runners should override this method.
"""
- raise NotImplementedError
+ pipeline.visit(
+ group_by_key_input_visitor(
+ not
options.view_as(TypeOptions).allow_non_deterministic_key_coders)
+ )
+
+ # TODO: https://github.com/apache/beam/issues/19168
+ # portable runner specific default
+ if options.view_as(SetupOptions).sdk_location == 'default':
+ options.view_as(SetupOptions).sdk_location = 'container'
+
+ return self.run_full_pipeline(
Review Comment:
What is the semantic distinction between `run_pipeline` vs
`run_full_pipeline`? It sounds like `run_pipeline` could run exectute
subgraphs, but it calls into `run_full_pipeline`, which is supposed to run the
entire graph.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]