This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new daf81143388 Merge pull request #31449 Pass through docs (and configs) 
for SqlProviders.
daf81143388 is described below

commit daf811433886a5efc2df8f1616aefa9c42706c93
Author: Robert Bradshaw <rober...@gmail.com>
AuthorDate: Fri May 31 09:39:07 2024 -0700

    Merge pull request #31449 Pass through docs (and configs) for SqlProviders.
---
 sdks/python/apache_beam/yaml/yaml_join.py     | 45 +++++++++++++++++++++------
 sdks/python/apache_beam/yaml/yaml_provider.py | 27 +++++++++++++---
 2 files changed, 57 insertions(+), 15 deletions(-)

diff --git a/sdks/python/apache_beam/yaml/yaml_join.py 
b/sdks/python/apache_beam/yaml/yaml_join.py
index 0b060b6a0ca..04a24642c23 100644
--- a/sdks/python/apache_beam/yaml/yaml_join.py
+++ b/sdks/python/apache_beam/yaml/yaml_join.py
@@ -178,26 +178,51 @@ def _SqlJoinTransform(
     fields: Optional[Dict[str, Any]] = None):
   """Joins two or more inputs using a specified condition.
 
+  For example::
+
+      type: Join
+      input:
+        input1: SomeTransform
+        input2: AnotherTransform
+        input3: YetAnotherTransform
+      config:
+        type: inner
+        equalities:
+          - input1: colA
+            input2: colB
+          - input2: colX
+            input3: colY
+        fields:
+          input1: [colA, colB, colC]
+          input2: {new_name: colB}
+
+  would perform an inner join on the three inputs satisfying the constraints
+  that `input1.colA = input2.colB` and `input2.colX = input3.colY`
+  emitting rows with `colA`, `colB` and `colC` from `input1`, the values of
+  `input2.colB` as a field called `new_name`, and all the fields from `input3`.
+
   Args:
     type: The type of join. Could be a string value in
         ["inner", "left", "right", "outer"] that specifies the type of join to
         be performed. For scenarios with multiple inputs to join where 
different
         join types are desired, specify the inputs to be outer joined. For
-        example, {outer: [input1, input2]} means that input1 & input2 will be
-        outer joined using the conditions specified, while other inputs will be
-        inner joined.
+        example, ``{outer: [input1, input2]}`` means that `input1` and `input2`
+        will be outer joined using the conditions specified, while other inputs
+        will be inner joined.
     equalities: The condition to join on. A list of sets of columns that should
-        be equal to fulfill the join condition. For the simple scenario to join
-        on the same column across all inputs and the column name is the same,
-        specify the column name as a str.
+        be equal to fulfill the join condition. For the simple scenario of
+        joining on the same column across all inputs where the column name is
+        the same, one can specify the column name as the equality rather than
+        having to list it for every input.
     fields: The fields to be outputted. A mapping with the input alias as the
-        key and the fields in the input to be outputted. The value in the map
+        key and the list of fields in the input to be outputted.
+        The value in the map
         can either be a dictionary with the new field name as the key and the
         original field name as the value (e.g new_field_name: field_name), or a
         list of the fields to be outputted with their original names
-        (e.g [col1, col2, col3]), or an '*' indicating all fields in the input
-        will be outputted. If not specified, all fields from all inputs will be
-        outputted.
+        (e.g ``[col1, col2, col3]``), or an '*' indicating all fields in the
+        input will be outputted. If not specified, all fields from all inputs
+        will be outputted.
   """
 
   _validate_input(pcolls)
diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py 
b/sdks/python/apache_beam/yaml/yaml_provider.py
index 52452daff7e..d5f6d03c284 100755
--- a/sdks/python/apache_beam/yaml/yaml_provider.py
+++ b/sdks/python/apache_beam/yaml/yaml_provider.py
@@ -424,7 +424,10 @@ class InlineProvider(Provider):
     return self._transform_factories.keys()
 
   def config_schema(self, typ):
-    factory = self._transform_factories[typ]
+    return self.config_schema_from_callable(self._transform_factories[typ])
+
+  @classmethod
+  def config_schema_from_callable(cls, factory):
     if isinstance(factory, type) and issubclass(factory, beam.PTransform):
       # https://bugs.python.org/issue40897
       params = dict(inspect.signature(factory.__init__).parameters)
@@ -442,7 +445,7 @@ class InlineProvider(Provider):
 
     docs = {
         param.arg_name: param.description
-        for param in self.get_docs(typ).params
+        for param in cls.get_docs(factory).params
     }
 
     names_and_types = [
@@ -455,17 +458,22 @@ class InlineProvider(Provider):
         ])
 
   def description(self, typ):
+    return self.description_from_callable(self._transform_factories[typ])
+
+  @classmethod
+  def description_from_callable(cls, factory):
     def empty_if_none(s):
       return s or ''
 
-    docs = self.get_docs(typ)
+    docs = cls.get_docs(factory)
     return (
         empty_if_none(docs.short_description) +
         ('\n\n' if docs.blank_after_short_description else '\n') +
         empty_if_none(docs.long_description)).strip() or None
 
-  def get_docs(self, typ):
-    docstring = self._transform_factories[typ].__doc__ or ''
+  @classmethod
+  def get_docs(cls, factory):
+    docstring = factory.__doc__ or ''
     # These "extra" docstring parameters are not relevant for YAML and mess
     # up the parsing.
     docstring = re.sub(
@@ -511,6 +519,15 @@ class SqlBackedProvider(Provider):
   def provided_transforms(self):
     return self._transforms.keys()
 
+  def config_schema(self, type):
+    full_config = InlineProvider.config_schema_from_callable(
+        self._transforms[type])
+    # Omit the (first) query -> transform parameter.
+    return schema_pb2.Schema(fields=full_config.fields[1:])
+
+  def description(self, type):
+    return InlineProvider.description_from_callable(self._transforms[type])
+
   def available(self):
     return self.sql_provider().available()
 

Reply via email to