This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4891a81a544 [YAML] Add and cleanup documentation for several builtin
transforms. (#29673)
4891a81a544 is described below
commit 4891a81a5449f941297b4ed12b9ed1cb5a9d2629
Author: Robert Bradshaw <[email protected]>
AuthorDate: Fri Dec 8 17:20:22 2023 -0800
[YAML] Add and cleanup documentation for several builtin transforms.
(#29673)
---
.../transforms/SchemaTransformProvider.java | 5 ++
.../sdk/expansion/service/ExpansionService.java | 1 +
.../SqlTransformSchemaTransformProvider.java | 17 +++-
sdks/python/apache_beam/transforms/external.py | 22 +++++-
.../apache_beam/yaml/standard_providers.yaml | 14 ++--
sdks/python/apache_beam/yaml/yaml_mapping.py | 22 ++++++
sdks/python/apache_beam/yaml/yaml_provider.py | 90 +++++++++++++++++++---
7 files changed, 149 insertions(+), 22 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java
index e542007c9a5..e73ec5d870c 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java
@@ -36,6 +36,11 @@ public interface SchemaTransformProvider {
/** Returns an id that uniquely represents this transform. */
String identifier();
+ /** Returns a description of this transform to be used for documentation. */
+ default String description() {
+ return "";
+ }
+
/**
* Returns the expected schema of the configuration object. Note this is
distinct from the schema
* of the transform itself.
diff --git
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
index 43690c60370..7760cab64ac 100644
---
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
+++
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
@@ -767,6 +767,7 @@ public class ExpansionService extends
ExpansionServiceGrpc.ExpansionServiceImplB
transformProvider.getAllProviders()) {
SchemaTransformConfig.Builder schemaTransformConfigBuilder =
SchemaTransformConfig.newBuilder();
+ schemaTransformConfigBuilder.setDescription(provider.description());
schemaTransformConfigBuilder.setConfigSchema(
SchemaTranslation.schemaToProto(provider.configurationSchema(),
true));
schemaTransformConfigBuilder.addAllInputPcollectionNames(provider.inputCollectionNames());
diff --git
a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
index 54415644152..f032da0799d 100644
---
a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
+++
b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
@@ -71,6 +71,21 @@ public class SqlTransformSchemaTransformProvider implements
SchemaTransformProvi
return "schematransform:org.apache.beam:sql_transform:v1";
}
+ @Override
+ public String description() {
+ return "A transform that executes a SQL query on its input
PCollections.\n\n"
+ + "If a single input is given, it may be referred to as `PCOLLECTION`,
e.g. the query could be of the form"
+ + "\n\n"
+ + " SELECT a, sum(b) FROM PCOLLECTION"
+ + "\n\n"
+ + "If multiple inputs are given, the should be named as they are in
the query, e.g."
+ + "\n\n"
+ + " SELECT a, b, c FROM pcoll_1 join pcoll_2 using (b)"
+ + "\n\n"
+ + "For more details about Beam SQL in general see "
+ + "[the Beam SQL
documentation](https://beam.apache.org/documentation/dsls/sql/overview/).";
+ }
+
@Override
public Schema configurationSchema() {
List<String> providers = new ArrayList<>();
@@ -82,7 +97,7 @@ public class SqlTransformSchemaTransformProvider implements
SchemaTransformProvi
EnumerationType providerEnum = EnumerationType.create(providers);
return Schema.of(
- Schema.Field.of("query", Schema.FieldType.STRING),
+ Schema.Field.of("query", Schema.FieldType.STRING).withDescription("SQL
query to execute"),
Schema.Field.nullable(
"ddl", Schema.FieldType.STRING), // TODO: Underlying builder seems
more capable?
Schema.Field.nullable("dialect",
Schema.FieldType.logicalType(QUERY_ENUMERATION)),
diff --git a/sdks/python/apache_beam/transforms/external.py
b/sdks/python/apache_beam/transforms/external.py
index a69ecbaee22..997cea347d3 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -41,6 +41,7 @@ from apache_beam.portability.api import beam_expansion_api_pb2
from apache_beam.portability.api import beam_expansion_api_pb2_grpc
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.portability.api import external_transforms_pb2
+from apache_beam.portability.api import schema_pb2
from apache_beam.runners import pipeline_context
from apache_beam.runners.portability import artifact_service
from apache_beam.transforms import environments
@@ -51,6 +52,7 @@ from apache_beam.typehints import row_type
from apache_beam.typehints.schemas import named_fields_to_schema
from apache_beam.typehints.schemas import named_tuple_from_schema
from apache_beam.typehints.schemas import named_tuple_to_schema
+from apache_beam.typehints.schemas import typing_from_runner_api
from apache_beam.typehints.trivial_inference import instance_to_type
from apache_beam.typehints.typehints import Union
from apache_beam.typehints.typehints import UnionConstraint
@@ -450,8 +452,24 @@ class SchemaAwareExternalTransform(ptransform.PTransform):
schema = named_tuple_from_schema(proto_config.config_schema)
except Exception as exn:
if ignore_errors:
- logging.info("Bad schema for %s: %s", identifier, str(exn)[:250])
- continue
+ truncated_schema = schema_pb2.Schema()
+ truncated_schema.CopyFrom(proto_config.config_schema)
+ for field in truncated_schema.fields:
+ try:
+ typing_from_runner_api(field.type)
+ except Exception:
+ if field.type.nullable:
+ # Set it to an empty placeholder type.
+ field.type.CopyFrom(
+ schema_pb2.FieldType(
+ nullable=True,
+ row_type=schema_pb2.RowType(
+ schema=schema_pb2.Schema())))
+ try:
+ schema = named_tuple_from_schema(truncated_schema)
+ except Exception as exn:
+ logging.info("Bad schema for %s: %s", identifier, str(exn)[:250])
+ continue
else:
raise
diff --git a/sdks/python/apache_beam/yaml/standard_providers.yaml
b/sdks/python/apache_beam/yaml/standard_providers.yaml
index c612d441208..0f1bc14c47c 100644
--- a/sdks/python/apache_beam/yaml/standard_providers.yaml
+++ b/sdks/python/apache_beam/yaml/standard_providers.yaml
@@ -18,23 +18,18 @@
# TODO(robertwb): Add more providers.
# TODO(robertwb): Perhaps auto-generate this file?
-- type: 'beamJar'
- config:
- gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar'
- version: BEAM_VERSION
- transforms:
- Sql: 'beam:external:java:sql:v1'
- MapToFields-java:
"beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1"
- MapToFields-generic:
"beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1"
-
- type: renaming
transforms:
+ 'Sql': 'Sql'
'MapToFields-java': 'MapToFields-java'
'MapToFields-generic': 'MapToFields-java'
'Filter-java': 'Filter-java'
'Explode': 'Explode'
config:
mappings:
+ 'Sql':
+ query: 'query'
+ # Unfortunately dialect is a java logical type.
'MapToFields-generic':
language: 'language'
append: 'append'
@@ -57,6 +52,7 @@
underlying_provider:
type: beamJar
transforms:
+ Sql: "schematransform:org.apache.beam:sql_transform:v1"
MapToFields-java:
"beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1"
Filter-java: "beam:schematransform:org.apache.beam:yaml:filter-java:v1"
Explode: "beam:schematransform:org.apache.beam:yaml:explode:v1"
diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py
b/sdks/python/apache_beam/yaml/yaml_mapping.py
index 08c7a59819a..0ce706bbea5 100644
--- a/sdks/python/apache_beam/yaml/yaml_mapping.py
+++ b/sdks/python/apache_beam/yaml/yaml_mapping.py
@@ -326,6 +326,28 @@ def
maybe_with_exception_handling_transform_fn(transform_fn):
class _Explode(beam.PTransform):
+ """Explodes (aka unnest/flatten) one or more fields producing multiple rows.
+
+ Given one or more fields of iterable type, produces multiple rows, one for
+ each value of that field. For example, a row of the form `('a', [1, 2, 3])`
+ would expand to `('a', 1)`, `('a', 2')`, and `('a', 3)` when exploded on
+ the second field.
+
+ This is akin to a `FlatMap` when paired with the MapToFields transform.
+
+ Args:
+ fields: The list of fields to expand.
+ cross_product: If multiple fields are specified, indicates whether the
+ full cross-product of combinations should be produced, or if the
+ first element of the first field corresponds to the first element
+ of the second field, etc. For example, the row
+ `(['a', 'b'], [1, 2])` would expand to the four rows
+ `('a', 1)`, `('a', 2)`, `('b', 1)`, and `('b', 2)` when
+ `cross_product` is set to `true` but only the two rows
+ `('a', 1)` and `('b', 2)` when it is set to `false`.
+ Only meaningful (and required) if multiple rows are specified.
+ error_handling: Whether and how to handle errors during iteration.
+ """
def __init__(
self,
fields: Union[str, Collection[str]],
diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py
b/sdks/python/apache_beam/yaml/yaml_provider.py
index 6a2d313183e..f6078769c65 100755
--- a/sdks/python/apache_beam/yaml/yaml_provider.py
+++ b/sdks/python/apache_beam/yaml/yaml_provider.py
@@ -546,7 +546,7 @@ def dicts_to_rows(o):
def create_builtin_provider():
- def create(elements: Iterable[Any], reshuffle: bool = True):
+ def create(elements: Iterable[Any], reshuffle: Optional[bool] = True):
"""Creates a collection containing a specified set of elements.
YAML/JSON-style mappings will be interpreted as Beam rows. For example::
@@ -560,17 +560,48 @@ def create_builtin_provider():
Args:
elements: The set of elements that should belong to the PCollection.
YAML/JSON-style mappings will be interpreted as Beam rows.
- reshuffle (optional): Whether to introduce a reshuffle if there is more
- than one element in the collection. Defaults to True.
+ reshuffle (optional): Whether to introduce a reshuffle (to possibly
+ redistribute the work) if there is more than one element in the
+ collection. Defaults to True.
"""
- return beam.Create([element_to_rows(e) for e in elements], reshuffle)
+ return beam.Create([element_to_rows(e) for e in elements],
+ reshuffle=reshuffle is not False)
# Or should this be posargs, args?
# pylint: disable=dangerous-default-value
def fully_qualified_named_transform(
constructor: str,
- args: Iterable[Any] = (),
- kwargs: Mapping[str, Any] = {}):
+ args: Optional[Iterable[Any]] = (),
+ kwargs: Optional[Mapping[str, Any]] = {}):
+ """A Python PTransform identified by fully qualified name.
+
+ This allows one to import, construct, and apply any Beam Python transform.
+ This can be useful for using transforms that have not yet been exposed
+ via a YAML interface. Note, however, that conversion may be required if
this
+ transform does not accept or produce Beam Rows.
+
+ For example,
+
+ type: PyTransform
+ config:
+ constructor: apache_beam.pkg.mod.SomeClass
+ args: [1, 'foo']
+ kwargs:
+ baz: 3
+
+ can be used to access the transform
+ `apache_beam.pkg.mod.SomeClass(1, 'foo', baz=3)`.
+
+ Args:
+ constructor: Fully qualified name of a callable used to construct the
+ transform. Often this is a class such as
+ `apache_beam.pkg.mod.SomeClass` but it can also be a function or
+ any other callable that returns a PTransform.
+ args: A list of parameters to pass to the callable as positional
+ arguments.
+ kwargs: A list of parameters to pass to the callable as keyword
+ arguments.
+ """
with FullyQualifiedNamedTransform.with_filter('*'):
return constructor >> FullyQualifiedNamedTransform(
constructor, args, kwargs)
@@ -579,6 +610,19 @@ def create_builtin_provider():
# exactly zero or one PCollection in yaml (as they would be interpreted as
# PBegin and the PCollection itself respectively).
class Flatten(beam.PTransform):
+ """Flattens multiple PCollections into a single PCollection.
+
+ The elements of the resulting PCollection will be the (disjoint) union of
+ all the elements of all the inputs.
+
+ Note that in YAML transforms can always take a list of inputs which will
+ be implicitly flattened.
+ """
+ def __init__(self):
+ # Suppress the "label" argument from the superclass for better docs.
+ # pylint: disable=useless-parent-delegation
+ super().__init__()
+
def expand(self, pcolls):
if isinstance(pcolls, beam.PCollection):
pipeline_arg = {}
@@ -592,6 +636,24 @@ def create_builtin_provider():
return pcolls | beam.Flatten(**pipeline_arg)
class WindowInto(beam.PTransform):
+ # pylint: disable=line-too-long
+
+ """A window transform assigning windows to each element of a PCollection.
+
+ The assigned windows will affect all downstream aggregating operations,
+ which will aggregate by window as well as by key.
+
+ See [the Beam documentation on
windowing](https://beam.apache.org/documentation/programming-guide/#windowing)
+ for more details.
+
+ Note that any Yaml transform can have a
+ [windowing
parameter](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/README.md#windowing),
+ which is applied to its inputs (if any) or outputs (if there are no inputs)
+ which means that explicit WindowInto operations are not typically needed.
+
+ Args:
+ windowing: the type and parameters of the windowing to perform
+ """
def __init__(self, windowing):
self._window_transform = self._parse_window_spec(windowing)
@@ -617,13 +679,21 @@ def create_builtin_provider():
# TODO: Triggering, etc.
return beam.WindowInto(window_fn)
- def log_and_return(x):
- logging.info(x)
- return x
+ def LogForTesting():
+ """Logs each element of its input PCollection.
+
+ The output of this transform is a copy of its input for ease of use in
+ chain-style pipelines.
+ """
+ def log_and_return(x):
+ logging.info(x)
+ return x
+
+ return beam.Map(log_and_return)
return InlineProvider({
'Create': create,
- 'LogForTesting': lambda: beam.Map(log_and_return),
+ 'LogForTesting': LogForTesting,
'PyTransform': fully_qualified_named_transform,
'Flatten': Flatten,
'WindowInto': WindowInto,