tvalentyn commented on code in PR #31458:
URL: https://github.com/apache/beam/pull/31458#discussion_r1624272603
##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -564,13 +600,21 @@ def invoke_setup(self):
"""Invokes the DoFn.setup() method
"""
+ self._setup_context_values = {
+ d: d.context_manager.__enter__()
Review Comment:
`d` here and below no longer has the reference to 'default', so might make
readers wonder why/what is called `d`. Consider using `c` or something else
##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -525,6 +525,44 @@ def __init__(
self.param_id = 'WatermarkEstimatorProvider'
+class _BundleContextParam(_DoFnParam):
+ """Allows one to use a context manager to manage bundle-scoped parameters.
+
+ The context will be entered at the start of each bundle and exited at the
+ end, equivalent to the `start_bundle` and `finish_bundle` methods on a DoFn.
+
+ The object returned from `__enter__`, if any will be substituted for this
Review Comment:
```suggestion
The object returned from `__enter__`, if any, will be substituted for this
```
##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -525,6 +525,44 @@ def __init__(
self.param_id = 'WatermarkEstimatorProvider'
+class _BundleContextParam(_DoFnParam):
+ """Allows one to use a context manager to manage bundle-scoped parameters.
+
+ The context will be entered at the start of each bundle and exited at the
+ end, equivalent to the `start_bundle` and `finish_bundle` methods on a DoFn.
+
+ The object returned from `__enter__`, if any will be substituted for this
+ parameter in invocations.
+
+ This can be especially useful for setting up shared context in transforms
+ like `Map`, `FlatMap`, and `Filter` where one does not have start_bundle
+ and finish_bundle methods.
+ """
+ def __init__(self, context_manager, name=None):
+ super().__init__(f'BundleContextParam_{name or id(self)}')
+ self.context_manager = context_manager
+
+
+class _SetupContextParam(_DoFnParam):
+ """Allows one to use a context manager to manage DoFn-scoped parameters.
+
+ The context will be entered before the DoFn is used and exited when it is
+ discarded, equivalent to the `setup` and `teardown` methods of a DoFn.
+ (Note, like `teardown`, exiting is best effort, as workers may be killed
+ before all DoFns are torn down.)
+
+ The object returned from `__enter__`, if any will be substituted for this
+ parameter in invocations.
+
+ This can be useful for setting up shared resources like persistent
+ connections to external services for transforms like `Map`, `FlatMap`, and
Review Comment:
Can we add an example in
https://beam.apache.org/documentation/transforms/python/elementwise/map/ ?
##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -525,6 +525,44 @@ def __init__(
self.param_id = 'WatermarkEstimatorProvider'
+class _BundleContextParam(_DoFnParam):
+ """Allows one to use a context manager to manage bundle-scoped parameters.
+
+ The context will be entered at the start of each bundle and exited at the
+ end, equivalent to the `start_bundle` and `finish_bundle` methods on a DoFn.
+
+ The object returned from `__enter__`, if any will be substituted for this
+ parameter in invocations.
+
+ This can be especially useful for setting up shared context in transforms
+ like `Map`, `FlatMap`, and `Filter` where one does not have start_bundle
+ and finish_bundle methods.
+ """
+ def __init__(self, context_manager, name=None):
+ super().__init__(f'BundleContextParam_{name or id(self)}')
+ self.context_manager = context_manager
+
+
+class _SetupContextParam(_DoFnParam):
+ """Allows one to use a context manager to manage DoFn-scoped parameters.
+
+ The context will be entered before the DoFn is used and exited when it is
+ discarded, equivalent to the `setup` and `teardown` methods of a DoFn.
+ (Note, like `teardown`, exiting is best effort, as workers may be killed
+ before all DoFns are torn down.)
+
+ The object returned from `__enter__`, if any will be substituted for this
Review Comment:
```suggestion
The object returned from `__enter__`, if any, will be substituted for this
```
##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -430,6 +430,42 @@ def has_bundle_finalization(self):
pass
return False
+ def get_bundle_contexts(self):
+ seen = set()
+ for sig in (self.setup_lifecycle_method,
Review Comment:
this might be unactionable but reading this l was wondering how do we know
if the list of method signatures to check here is complete.
##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -525,6 +525,44 @@ def __init__(
self.param_id = 'WatermarkEstimatorProvider'
+class _BundleContextParam(_DoFnParam):
Review Comment:
Can we have an example usage in the docstring?
##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -525,6 +525,44 @@ def __init__(
self.param_id = 'WatermarkEstimatorProvider'
+class _BundleContextParam(_DoFnParam):
+ """Allows one to use a context manager to manage bundle-scoped parameters.
+
+ The context will be entered at the start of each bundle and exited at the
+ end, equivalent to the `start_bundle` and `finish_bundle` methods on a DoFn.
+
+ The object returned from `__enter__`, if any will be substituted for this
+ parameter in invocations.
Review Comment:
We should mention somewhere that the order of the context invocations is
unspecified.
##########
website/www/site/content/en/documentation/transforms/python/elementwise/pardo.md:
##########
@@ -76,10 +76,12 @@ starts and finishes with `start_bundle` and `finish_bundle`.
multiple instances of a given `DoFn` subclass may be created (e.g., due to
parallelization, or due to garbage collection after a period
of disuse).
This is a good place to connect to database instances, open network
connections or other resources.
+ See also `DoFn.SetupContextParam` for a way to accomplish this via context
managers.
Review Comment:
1. How do we make this documentation content properly discoverable without
having users go to search for a docstring on internal class in Beam codebase in
a GH repo? Should DoFn.BundeContextParam copy the docstring so that it is
visible in
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.process,
or ?
2. Let's update the `The full set of parameters is:` section in
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.process
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]