This is an automated email from the ASF dual-hosted git repository. austin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new de665b77d9f [YAML] - Normalize YAML PubSub format (#31068) de665b77d9f is described below commit de665b77d9ff1efc3c7ec88dfcf92bc40a0ad539 Author: Ferran Fernández Garrido <ffernandez....@gmail.com> AuthorDate: Tue Apr 23 16:02:11 2024 +0200 [YAML] - Normalize YAML PubSub format (#31068) * [YAML] - Normalize YAML PubSub format * [YAML] - Fix format * [YAML] - Fix import * [YAML] - Update tests and web --- sdks/python/apache_beam/yaml/yaml_io.py | 39 ++++++++++++++-------- sdks/python/apache_beam/yaml/yaml_io_test.py | 30 ++++++++--------- .../www/site/content/en/documentation/sdks/yaml.md | 14 ++++---- 3 files changed, 47 insertions(+), 36 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index 1cd7231e35d..227a36b33ca 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -24,6 +24,7 @@ implementations of the same transforms, the configs must be kept in sync. """ import io +import logging import os from typing import Any from typing import Callable @@ -164,16 +165,21 @@ def write_to_bigquery( def _create_parser( format, schema: Any) -> Tuple[schema_pb2.Schema, Callable[[bytes], beam.Row]]: - if format == 'raw': + + if format.islower(): + format = format.upper() + logging.warning('Lowercase formats will be deprecated in version 2.60') + + if format == 'RAW': if schema: - raise ValueError('raw format does not take a schema') + raise ValueError('RAW format does not take a schema') return ( schema_pb2.Schema(fields=[schemas.schema_field('payload', bytes)]), lambda payload: beam.Row(payload=payload)) - elif format == 'json': + elif format == 'JSON': beam_schema = json_utils.json_schema_to_beam_schema(schema) return beam_schema, json_utils.json_parser(beam_schema, schema) - elif format == 'avro': + elif format == 'AVRO': beam_schema = avroio.avro_schema_to_beam_schema(schema) covert_to_row = avroio.avro_dict_to_beam_row(schema, beam_schema) # pylint: disable=line-too-long @@ -188,16 +194,21 @@ def _create_parser( def _create_formatter( format, schema: Any, beam_schema: schema_pb2.Schema) -> Callable[[beam.Row], bytes]: - if format == 'raw': + + if format.islower(): + format = format.upper() + logging.warning('Lowercase formats will be deprecated in version 2.60') + + if format == 'RAW': if schema: - raise ValueError('raw format does not take a schema') + raise ValueError('RAW format does not take a schema') field_names = [field.name for field in beam_schema.fields] if len(field_names) != 1: raise ValueError(f'Expecting exactly one field, found {field_names}') return lambda row: getattr(row, field_names[0]) - elif format == 'json': + elif format == 'JSON': return json_utils.json_formater(beam_schema) - elif format == 'avro': + elif format == 'AVRO': avro_schema = schema or avroio.beam_schema_to_avro_schema(beam_schema) from_row = avroio.beam_row_to_avro_dict(avro_schema, beam_schema) @@ -238,10 +249,10 @@ def read_from_pubsub( format: The expected format of the message payload. Currently suported formats are - - raw: Produces records with a single `payload` field whose contents + - RAW: Produces records with a single `payload` field whose contents are the raw bytes of the pubsub message. - - avro: Parses records with a given avro schema. - - json: Parses records with a given json schema. + - AVRO: Parses records with a given Avro schema. + - JSON: Parses records with a given JSON schema. schema: Schema specification for the given format. attributes: List of attribute keys whose values will be flattened into the @@ -335,12 +346,12 @@ def write_to_pubsub( format: How to format the message payload. Currently suported formats are - - raw: Expects a message with a single field (excluding + - RAW: Expects a message with a single field (excluding attribute-related fields) whose contents are used as the raw bytes of the pubsub message. - - avro: Encodes records with a given avro schema, which may be inferred + - AVRO: Encodes records with a given Avro schema, which may be inferred from the input PCollection schema. - - json: Formats records with a given json schema, which may be inferred + - JSON: Formats records with a given JSON schema, which may be inferred from the input PCollection schema. schema: Schema specification for the given format. diff --git a/sdks/python/apache_beam/yaml/yaml_io_test.py b/sdks/python/apache_beam/yaml/yaml_io_test.py index 54fbac0fbeb..393e31de0e6 100644 --- a/sdks/python/apache_beam/yaml/yaml_io_test.py +++ b/sdks/python/apache_beam/yaml/yaml_io_test.py @@ -95,7 +95,7 @@ class YamlPubSubTest(unittest.TestCase): type: ReadFromPubSub config: topic: my_topic - format: raw + format: RAW ''') assert_that( result, @@ -115,7 +115,7 @@ class YamlPubSubTest(unittest.TestCase): type: ReadFromPubSub config: topic: my_topic - format: raw + format: RAW attributes: [attr] ''') assert_that( @@ -139,7 +139,7 @@ class YamlPubSubTest(unittest.TestCase): type: ReadFromPubSub config: topic: my_topic - format: raw + format: RAW attributes_map: attrMap ''') assert_that( @@ -163,7 +163,7 @@ class YamlPubSubTest(unittest.TestCase): type: ReadFromPubSub config: topic: my_topic - format: raw + format: RAW id_attribute: some_attr ''') assert_that( @@ -203,7 +203,7 @@ class YamlPubSubTest(unittest.TestCase): type: ReadFromPubSub config: topic: my_topic - format: avro + format: AVRO schema: %s ''' % json.dumps(self._avro_schema)) assert_that( @@ -227,7 +227,7 @@ class YamlPubSubTest(unittest.TestCase): type: ReadFromPubSub config: topic: my_topic - format: json + format: JSON schema: type: object properties: @@ -267,7 +267,7 @@ class YamlPubSubTest(unittest.TestCase): type: ReadFromPubSub config: topic: my_topic - format: json + format: JSON schema: type: object properties: @@ -300,7 +300,7 @@ class YamlPubSubTest(unittest.TestCase): type: ReadFromPubSub config: topic: my_topic - format: json + format: JSON schema: type: object properties: @@ -322,7 +322,7 @@ class YamlPubSubTest(unittest.TestCase): type: ReadFromPubSub config: topic: my_topic - format: json + format: JSON schema: type: object properties: @@ -353,7 +353,7 @@ class YamlPubSubTest(unittest.TestCase): type: WriteToPubSub config: topic: my_topic - format: raw + format: RAW ''')) def test_write_with_attribute(self): @@ -374,7 +374,7 @@ class YamlPubSubTest(unittest.TestCase): type: WriteToPubSub config: topic: my_topic - format: raw + format: RAW attributes: [attr] ''')) @@ -396,7 +396,7 @@ class YamlPubSubTest(unittest.TestCase): type: WriteToPubSub config: topic: my_topic - format: raw + format: RAW attributes_map: attrMap ''')) @@ -415,7 +415,7 @@ class YamlPubSubTest(unittest.TestCase): type: WriteToPubSub config: topic: my_topic - format: raw + format: RAW id_attribute: some_attr ''')) @@ -438,7 +438,7 @@ class YamlPubSubTest(unittest.TestCase): type: WriteToPubSub config: topic: my_topic - format: avro + format: AVRO ''')) def test_write_json(self): @@ -463,7 +463,7 @@ class YamlPubSubTest(unittest.TestCase): type: WriteToPubSub config: topic: my_topic - format: json + format: JSON attributes: [label] attributes_map: other ''')) diff --git a/website/www/site/content/en/documentation/sdks/yaml.md b/website/www/site/content/en/documentation/sdks/yaml.md index 5c34e993982..530ccf1177c 100644 --- a/website/www/site/content/en/documentation/sdks/yaml.md +++ b/website/www/site/content/en/documentation/sdks/yaml.md @@ -424,7 +424,7 @@ pipeline: - type: ReadFromPubSub config: topic: myPubSubTopic - format: json + format: JSON schema: type: object properties: @@ -441,7 +441,7 @@ pipeline: - type: WriteToPubSub config: topic: anotherPubSubTopic - format: json + format: JSON options: streaming: true ``` @@ -469,7 +469,7 @@ pipeline: - type: WriteToPubSub config: topic: anotherPubSubTopic - format: json + format: JSON options: streaming: true ``` @@ -496,7 +496,7 @@ pipeline: - type: WriteToPubSub config: topic: anotherPubSubTopic - format: json + format: JSON options: streaming: true ``` @@ -556,7 +556,7 @@ pipeline: - type: WriteToPubSub config: topic: anotherPubSubTopic - format: json + format: JSON options: streaming: true ``` @@ -581,7 +581,7 @@ pipeline: - type: WriteToPubSub config: topic: anotherPubSubTopic - format: json + format: JSON windowing: type: fixed size: 60 @@ -700,7 +700,7 @@ pipeline: - type: WriteToPubSub config: topic: anotherPubSubTopic - format: json + format: JSON options: streaming: true ```