This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new daf81143388 Merge pull request #31449 Pass through docs (and configs) for SqlProviders. daf81143388 is described below commit daf811433886a5efc2df8f1616aefa9c42706c93 Author: Robert Bradshaw <rober...@gmail.com> AuthorDate: Fri May 31 09:39:07 2024 -0700 Merge pull request #31449 Pass through docs (and configs) for SqlProviders. --- sdks/python/apache_beam/yaml/yaml_join.py | 45 +++++++++++++++++++++------ sdks/python/apache_beam/yaml/yaml_provider.py | 27 +++++++++++++--- 2 files changed, 57 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_join.py b/sdks/python/apache_beam/yaml/yaml_join.py index 0b060b6a0ca..04a24642c23 100644 --- a/sdks/python/apache_beam/yaml/yaml_join.py +++ b/sdks/python/apache_beam/yaml/yaml_join.py @@ -178,26 +178,51 @@ def _SqlJoinTransform( fields: Optional[Dict[str, Any]] = None): """Joins two or more inputs using a specified condition. + For example:: + + type: Join + input: + input1: SomeTransform + input2: AnotherTransform + input3: YetAnotherTransform + config: + type: inner + equalities: + - input1: colA + input2: colB + - input2: colX + input3: colY + fields: + input1: [colA, colB, colC] + input2: {new_name: colB} + + would perform an inner join on the three inputs satisfying the constraints + that `input1.colA = input2.colB` and `input2.colX = input3.colY` + emitting rows with `colA`, `colB` and `colC` from `input1`, the values of + `input2.colB` as a field called `new_name`, and all the fields from `input3`. + Args: type: The type of join. Could be a string value in ["inner", "left", "right", "outer"] that specifies the type of join to be performed. For scenarios with multiple inputs to join where different join types are desired, specify the inputs to be outer joined. For - example, {outer: [input1, input2]} means that input1 & input2 will be - outer joined using the conditions specified, while other inputs will be - inner joined. + example, ``{outer: [input1, input2]}`` means that `input1` and `input2` + will be outer joined using the conditions specified, while other inputs + will be inner joined. equalities: The condition to join on. A list of sets of columns that should - be equal to fulfill the join condition. For the simple scenario to join - on the same column across all inputs and the column name is the same, - specify the column name as a str. + be equal to fulfill the join condition. For the simple scenario of + joining on the same column across all inputs where the column name is + the same, one can specify the column name as the equality rather than + having to list it for every input. fields: The fields to be outputted. A mapping with the input alias as the - key and the fields in the input to be outputted. The value in the map + key and the list of fields in the input to be outputted. + The value in the map can either be a dictionary with the new field name as the key and the original field name as the value (e.g new_field_name: field_name), or a list of the fields to be outputted with their original names - (e.g [col1, col2, col3]), or an '*' indicating all fields in the input - will be outputted. If not specified, all fields from all inputs will be - outputted. + (e.g ``[col1, col2, col3]``), or an '*' indicating all fields in the + input will be outputted. If not specified, all fields from all inputs + will be outputted. """ _validate_input(pcolls) diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index 52452daff7e..d5f6d03c284 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -424,7 +424,10 @@ class InlineProvider(Provider): return self._transform_factories.keys() def config_schema(self, typ): - factory = self._transform_factories[typ] + return self.config_schema_from_callable(self._transform_factories[typ]) + + @classmethod + def config_schema_from_callable(cls, factory): if isinstance(factory, type) and issubclass(factory, beam.PTransform): # https://bugs.python.org/issue40897 params = dict(inspect.signature(factory.__init__).parameters) @@ -442,7 +445,7 @@ class InlineProvider(Provider): docs = { param.arg_name: param.description - for param in self.get_docs(typ).params + for param in cls.get_docs(factory).params } names_and_types = [ @@ -455,17 +458,22 @@ class InlineProvider(Provider): ]) def description(self, typ): + return self.description_from_callable(self._transform_factories[typ]) + + @classmethod + def description_from_callable(cls, factory): def empty_if_none(s): return s or '' - docs = self.get_docs(typ) + docs = cls.get_docs(factory) return ( empty_if_none(docs.short_description) + ('\n\n' if docs.blank_after_short_description else '\n') + empty_if_none(docs.long_description)).strip() or None - def get_docs(self, typ): - docstring = self._transform_factories[typ].__doc__ or '' + @classmethod + def get_docs(cls, factory): + docstring = factory.__doc__ or '' # These "extra" docstring parameters are not relevant for YAML and mess # up the parsing. docstring = re.sub( @@ -511,6 +519,15 @@ class SqlBackedProvider(Provider): def provided_transforms(self): return self._transforms.keys() + def config_schema(self, type): + full_config = InlineProvider.config_schema_from_callable( + self._transforms[type]) + # Omit the (first) query -> transform parameter. + return schema_pb2.Schema(fields=full_config.fields[1:]) + + def description(self, type): + return InlineProvider.description_from_callable(self._transforms[type]) + def available(self): return self.sql_provider().available()