This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4891a81a544 [YAML] Add and cleanup documentation for several builtin 
transforms. (#29673)
4891a81a544 is described below

commit 4891a81a5449f941297b4ed12b9ed1cb5a9d2629
Author: Robert Bradshaw <[email protected]>
AuthorDate: Fri Dec 8 17:20:22 2023 -0800

    [YAML] Add and cleanup documentation for several builtin transforms. 
(#29673)
---
 .../transforms/SchemaTransformProvider.java        |  5 ++
 .../sdk/expansion/service/ExpansionService.java    |  1 +
 .../SqlTransformSchemaTransformProvider.java       | 17 +++-
 sdks/python/apache_beam/transforms/external.py     | 22 +++++-
 .../apache_beam/yaml/standard_providers.yaml       | 14 ++--
 sdks/python/apache_beam/yaml/yaml_mapping.py       | 22 ++++++
 sdks/python/apache_beam/yaml/yaml_provider.py      | 90 +++++++++++++++++++---
 7 files changed, 149 insertions(+), 22 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java
index e542007c9a5..e73ec5d870c 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java
@@ -36,6 +36,11 @@ public interface SchemaTransformProvider {
   /** Returns an id that uniquely represents this transform. */
   String identifier();
 
+  /** Returns a description of this transform to be used for documentation. */
+  default String description() {
+    return "";
+  }
+
   /**
    * Returns the expected schema of the configuration object. Note this is 
distinct from the schema
    * of the transform itself.
diff --git 
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
 
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
index 43690c60370..7760cab64ac 100644
--- 
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
+++ 
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
@@ -767,6 +767,7 @@ public class ExpansionService extends 
ExpansionServiceGrpc.ExpansionServiceImplB
         transformProvider.getAllProviders()) {
       SchemaTransformConfig.Builder schemaTransformConfigBuilder =
           SchemaTransformConfig.newBuilder();
+      schemaTransformConfigBuilder.setDescription(provider.description());
       schemaTransformConfigBuilder.setConfigSchema(
           SchemaTranslation.schemaToProto(provider.configurationSchema(), 
true));
       
schemaTransformConfigBuilder.addAllInputPcollectionNames(provider.inputCollectionNames());
diff --git 
a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
 
b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
index 54415644152..f032da0799d 100644
--- 
a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
+++ 
b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
@@ -71,6 +71,21 @@ public class SqlTransformSchemaTransformProvider implements 
SchemaTransformProvi
     return "schematransform:org.apache.beam:sql_transform:v1";
   }
 
+  @Override
+  public String description() {
+    return "A transform that executes a SQL query on its input 
PCollections.\n\n"
+        + "If a single input is given, it may be referred to as `PCOLLECTION`, 
e.g. the query could be of the form"
+        + "\n\n"
+        + "    SELECT a, sum(b) FROM PCOLLECTION"
+        + "\n\n"
+        + "If multiple inputs are given, the should be named as they are in 
the query, e.g."
+        + "\n\n"
+        + "    SELECT a, b, c FROM pcoll_1 join pcoll_2 using (b)"
+        + "\n\n"
+        + "For more details about Beam SQL in general see "
+        + "[the Beam SQL 
documentation](https://beam.apache.org/documentation/dsls/sql/overview/).";
+  }
+
   @Override
   public Schema configurationSchema() {
     List<String> providers = new ArrayList<>();
@@ -82,7 +97,7 @@ public class SqlTransformSchemaTransformProvider implements 
SchemaTransformProvi
     EnumerationType providerEnum = EnumerationType.create(providers);
 
     return Schema.of(
-        Schema.Field.of("query", Schema.FieldType.STRING),
+        Schema.Field.of("query", Schema.FieldType.STRING).withDescription("SQL 
query to execute"),
         Schema.Field.nullable(
             "ddl", Schema.FieldType.STRING), // TODO: Underlying builder seems 
more capable?
         Schema.Field.nullable("dialect", 
Schema.FieldType.logicalType(QUERY_ENUMERATION)),
diff --git a/sdks/python/apache_beam/transforms/external.py 
b/sdks/python/apache_beam/transforms/external.py
index a69ecbaee22..997cea347d3 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -41,6 +41,7 @@ from apache_beam.portability.api import beam_expansion_api_pb2
 from apache_beam.portability.api import beam_expansion_api_pb2_grpc
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.portability.api import external_transforms_pb2
+from apache_beam.portability.api import schema_pb2
 from apache_beam.runners import pipeline_context
 from apache_beam.runners.portability import artifact_service
 from apache_beam.transforms import environments
@@ -51,6 +52,7 @@ from apache_beam.typehints import row_type
 from apache_beam.typehints.schemas import named_fields_to_schema
 from apache_beam.typehints.schemas import named_tuple_from_schema
 from apache_beam.typehints.schemas import named_tuple_to_schema
+from apache_beam.typehints.schemas import typing_from_runner_api
 from apache_beam.typehints.trivial_inference import instance_to_type
 from apache_beam.typehints.typehints import Union
 from apache_beam.typehints.typehints import UnionConstraint
@@ -450,8 +452,24 @@ class SchemaAwareExternalTransform(ptransform.PTransform):
         schema = named_tuple_from_schema(proto_config.config_schema)
       except Exception as exn:
         if ignore_errors:
-          logging.info("Bad schema for %s: %s", identifier, str(exn)[:250])
-          continue
+          truncated_schema = schema_pb2.Schema()
+          truncated_schema.CopyFrom(proto_config.config_schema)
+          for field in truncated_schema.fields:
+            try:
+              typing_from_runner_api(field.type)
+            except Exception:
+              if field.type.nullable:
+                # Set it to an empty placeholder type.
+                field.type.CopyFrom(
+                    schema_pb2.FieldType(
+                        nullable=True,
+                        row_type=schema_pb2.RowType(
+                            schema=schema_pb2.Schema())))
+          try:
+            schema = named_tuple_from_schema(truncated_schema)
+          except Exception as exn:
+            logging.info("Bad schema for %s: %s", identifier, str(exn)[:250])
+            continue
         else:
           raise
 
diff --git a/sdks/python/apache_beam/yaml/standard_providers.yaml 
b/sdks/python/apache_beam/yaml/standard_providers.yaml
index c612d441208..0f1bc14c47c 100644
--- a/sdks/python/apache_beam/yaml/standard_providers.yaml
+++ b/sdks/python/apache_beam/yaml/standard_providers.yaml
@@ -18,23 +18,18 @@
 # TODO(robertwb): Add more providers.
 # TODO(robertwb): Perhaps auto-generate this file?
 
-- type: 'beamJar'
-  config:
-    gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar'
-    version: BEAM_VERSION
-  transforms:
-     Sql: 'beam:external:java:sql:v1'
-     MapToFields-java: 
"beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1"
-     MapToFields-generic: 
"beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1"
-
 - type: renaming
   transforms:
+    'Sql': 'Sql'
     'MapToFields-java': 'MapToFields-java'
     'MapToFields-generic': 'MapToFields-java'
     'Filter-java': 'Filter-java'
     'Explode': 'Explode'
   config:
     mappings:
+      'Sql':
+        query: 'query'
+        # Unfortunately dialect is a java logical type.
       'MapToFields-generic':
         language: 'language'
         append: 'append'
@@ -57,6 +52,7 @@
     underlying_provider:
       type: beamJar
       transforms:
+        Sql: "schematransform:org.apache.beam:sql_transform:v1"
         MapToFields-java: 
"beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1"
         Filter-java: "beam:schematransform:org.apache.beam:yaml:filter-java:v1"
         Explode: "beam:schematransform:org.apache.beam:yaml:explode:v1"
diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py 
b/sdks/python/apache_beam/yaml/yaml_mapping.py
index 08c7a59819a..0ce706bbea5 100644
--- a/sdks/python/apache_beam/yaml/yaml_mapping.py
+++ b/sdks/python/apache_beam/yaml/yaml_mapping.py
@@ -326,6 +326,28 @@ def 
maybe_with_exception_handling_transform_fn(transform_fn):
 
 
 class _Explode(beam.PTransform):
+  """Explodes (aka unnest/flatten) one or more fields producing multiple rows.
+
+  Given one or more fields of iterable type, produces multiple rows, one for
+  each value of that field. For example, a row of the form `('a', [1, 2, 3])`
+  would expand to `('a', 1)`, `('a', 2')`, and `('a', 3)` when exploded on
+  the second field.
+
+  This is akin to a `FlatMap` when paired with the MapToFields transform.
+
+  Args:
+      fields: The list of fields to expand.
+      cross_product: If multiple fields are specified, indicates whether the
+          full cross-product of combinations should be produced, or if the
+          first element of the first field corresponds to the first element
+          of the second field, etc. For example, the row
+          `(['a', 'b'], [1, 2])` would expand to the four rows
+          `('a', 1)`, `('a', 2)`, `('b', 1)`, and `('b', 2)` when
+          `cross_product` is set to `true` but only the two rows
+          `('a', 1)` and `('b', 2)` when it is set to `false`.
+          Only meaningful (and required) if multiple rows are specified.
+      error_handling: Whether and how to handle errors during iteration.
+  """
   def __init__(
       self,
       fields: Union[str, Collection[str]],
diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py 
b/sdks/python/apache_beam/yaml/yaml_provider.py
index 6a2d313183e..f6078769c65 100755
--- a/sdks/python/apache_beam/yaml/yaml_provider.py
+++ b/sdks/python/apache_beam/yaml/yaml_provider.py
@@ -546,7 +546,7 @@ def dicts_to_rows(o):
 
 
 def create_builtin_provider():
-  def create(elements: Iterable[Any], reshuffle: bool = True):
+  def create(elements: Iterable[Any], reshuffle: Optional[bool] = True):
     """Creates a collection containing a specified set of elements.
 
     YAML/JSON-style mappings will be interpreted as Beam rows. For example::
@@ -560,17 +560,48 @@ def create_builtin_provider():
     Args:
         elements: The set of elements that should belong to the PCollection.
             YAML/JSON-style mappings will be interpreted as Beam rows.
-        reshuffle (optional): Whether to introduce a reshuffle if there is more
-            than one element in the collection. Defaults to True.
+        reshuffle (optional): Whether to introduce a reshuffle (to possibly
+            redistribute the work) if there is more than one element in the
+            collection. Defaults to True.
     """
-    return beam.Create([element_to_rows(e) for e in elements], reshuffle)
+    return beam.Create([element_to_rows(e) for e in elements],
+                       reshuffle=reshuffle is not False)
 
   # Or should this be posargs, args?
   # pylint: disable=dangerous-default-value
   def fully_qualified_named_transform(
       constructor: str,
-      args: Iterable[Any] = (),
-      kwargs: Mapping[str, Any] = {}):
+      args: Optional[Iterable[Any]] = (),
+      kwargs: Optional[Mapping[str, Any]] = {}):
+    """A Python PTransform identified by fully qualified name.
+
+    This allows one to import, construct, and apply any Beam Python transform.
+    This can be useful for using transforms that have not yet been exposed
+    via a YAML interface. Note, however, that conversion may be required if 
this
+    transform does not accept or produce Beam Rows.
+
+    For example,
+
+        type: PyTransform
+        config:
+          constructor: apache_beam.pkg.mod.SomeClass
+          args: [1, 'foo']
+          kwargs:
+             baz: 3
+
+    can be used to access the transform
+    `apache_beam.pkg.mod.SomeClass(1, 'foo', baz=3)`.
+
+    Args:
+        constructor: Fully qualified name of a callable used to construct the
+            transform.  Often this is a class such as
+            `apache_beam.pkg.mod.SomeClass` but it can also be a function or
+            any other callable that returns a PTransform.
+        args: A list of parameters to pass to the callable as positional
+            arguments.
+        kwargs: A list of parameters to pass to the callable as keyword
+            arguments.
+    """
     with FullyQualifiedNamedTransform.with_filter('*'):
       return constructor >> FullyQualifiedNamedTransform(
           constructor, args, kwargs)
@@ -579,6 +610,19 @@ def create_builtin_provider():
   # exactly zero or one PCollection in yaml (as they would be interpreted as
   # PBegin and the PCollection itself respectively).
   class Flatten(beam.PTransform):
+    """Flattens multiple PCollections into a single PCollection.
+
+    The elements of the resulting PCollection will be the (disjoint) union of
+    all the elements of all the inputs.
+
+    Note that in YAML transforms can always take a list of inputs which will
+    be implicitly flattened.
+    """
+    def __init__(self):
+      # Suppress the "label" argument from the superclass for better docs.
+      # pylint: disable=useless-parent-delegation
+      super().__init__()
+
     def expand(self, pcolls):
       if isinstance(pcolls, beam.PCollection):
         pipeline_arg = {}
@@ -592,6 +636,24 @@ def create_builtin_provider():
       return pcolls | beam.Flatten(**pipeline_arg)
 
   class WindowInto(beam.PTransform):
+    # pylint: disable=line-too-long
+
+    """A window transform assigning windows to each element of a PCollection.
+
+    The assigned windows will affect all downstream aggregating operations,
+    which will aggregate by window as well as by key.
+
+    See [the Beam documentation on 
windowing](https://beam.apache.org/documentation/programming-guide/#windowing)
+    for more details.
+
+    Note that any Yaml transform can have a
+    [windowing 
parameter](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/README.md#windowing),
+    which is applied to its inputs (if any) or outputs (if there are no inputs)
+    which means that explicit WindowInto operations are not typically needed.
+
+    Args:
+      windowing: the type and parameters of the windowing to perform
+    """
     def __init__(self, windowing):
       self._window_transform = self._parse_window_spec(windowing)
 
@@ -617,13 +679,21 @@ def create_builtin_provider():
       # TODO: Triggering, etc.
       return beam.WindowInto(window_fn)
 
-  def log_and_return(x):
-    logging.info(x)
-    return x
+  def LogForTesting():
+    """Logs each element of its input PCollection.
+
+    The output of this transform is a copy of its input for ease of use in
+    chain-style pipelines.
+    """
+    def log_and_return(x):
+      logging.info(x)
+      return x
+
+    return beam.Map(log_and_return)
 
   return InlineProvider({
       'Create': create,
-      'LogForTesting': lambda: beam.Map(log_and_return),
+      'LogForTesting': LogForTesting,
       'PyTransform': fully_qualified_named_transform,
       'Flatten': Flatten,
       'WindowInto': WindowInto,

Reply via email to