This is an automated email from the ASF dual-hosted git repository.

austin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new de665b77d9f [YAML] - Normalize YAML PubSub format (#31068)
de665b77d9f is described below

commit de665b77d9ff1efc3c7ec88dfcf92bc40a0ad539
Author: Ferran Fernández Garrido <ffernandez....@gmail.com>
AuthorDate: Tue Apr 23 16:02:11 2024 +0200

    [YAML] - Normalize YAML PubSub format (#31068)
    
    * [YAML] - Normalize YAML PubSub format
    
    * [YAML] - Fix format
    
    * [YAML] - Fix import
    
    * [YAML] - Update tests and web
---
 sdks/python/apache_beam/yaml/yaml_io.py            | 39 ++++++++++++++--------
 sdks/python/apache_beam/yaml/yaml_io_test.py       | 30 ++++++++---------
 .../www/site/content/en/documentation/sdks/yaml.md | 14 ++++----
 3 files changed, 47 insertions(+), 36 deletions(-)

diff --git a/sdks/python/apache_beam/yaml/yaml_io.py 
b/sdks/python/apache_beam/yaml/yaml_io.py
index 1cd7231e35d..227a36b33ca 100644
--- a/sdks/python/apache_beam/yaml/yaml_io.py
+++ b/sdks/python/apache_beam/yaml/yaml_io.py
@@ -24,6 +24,7 @@ implementations of the same transforms, the configs must be 
kept in sync.
 """
 
 import io
+import logging
 import os
 from typing import Any
 from typing import Callable
@@ -164,16 +165,21 @@ def write_to_bigquery(
 def _create_parser(
     format,
     schema: Any) -> Tuple[schema_pb2.Schema, Callable[[bytes], beam.Row]]:
-  if format == 'raw':
+
+  if format.islower():
+    format = format.upper()
+    logging.warning('Lowercase formats will be deprecated in version 2.60')
+
+  if format == 'RAW':
     if schema:
-      raise ValueError('raw format does not take a schema')
+      raise ValueError('RAW format does not take a schema')
     return (
         schema_pb2.Schema(fields=[schemas.schema_field('payload', bytes)]),
         lambda payload: beam.Row(payload=payload))
-  elif format == 'json':
+  elif format == 'JSON':
     beam_schema = json_utils.json_schema_to_beam_schema(schema)
     return beam_schema, json_utils.json_parser(beam_schema, schema)
-  elif format == 'avro':
+  elif format == 'AVRO':
     beam_schema = avroio.avro_schema_to_beam_schema(schema)
     covert_to_row = avroio.avro_dict_to_beam_row(schema, beam_schema)
     # pylint: disable=line-too-long
@@ -188,16 +194,21 @@ def _create_parser(
 def _create_formatter(
     format, schema: Any,
     beam_schema: schema_pb2.Schema) -> Callable[[beam.Row], bytes]:
-  if format == 'raw':
+
+  if format.islower():
+    format = format.upper()
+    logging.warning('Lowercase formats will be deprecated in version 2.60')
+
+  if format == 'RAW':
     if schema:
-      raise ValueError('raw format does not take a schema')
+      raise ValueError('RAW format does not take a schema')
     field_names = [field.name for field in beam_schema.fields]
     if len(field_names) != 1:
       raise ValueError(f'Expecting exactly one field, found {field_names}')
     return lambda row: getattr(row, field_names[0])
-  elif format == 'json':
+  elif format == 'JSON':
     return json_utils.json_formater(beam_schema)
-  elif format == 'avro':
+  elif format == 'AVRO':
     avro_schema = schema or avroio.beam_schema_to_avro_schema(beam_schema)
     from_row = avroio.beam_row_to_avro_dict(avro_schema, beam_schema)
 
@@ -238,10 +249,10 @@ def read_from_pubsub(
     format: The expected format of the message payload.  Currently suported
       formats are
 
-        - raw: Produces records with a single `payload` field whose contents
+        - RAW: Produces records with a single `payload` field whose contents
             are the raw bytes of the pubsub message.
-        - avro: Parses records with a given avro schema.
-        - json: Parses records with a given json schema.
+        - AVRO: Parses records with a given Avro schema.
+        - JSON: Parses records with a given JSON schema.
 
     schema: Schema specification for the given format.
     attributes: List of attribute keys whose values will be flattened into the
@@ -335,12 +346,12 @@ def write_to_pubsub(
     format: How to format the message payload.  Currently suported
       formats are
 
-        - raw: Expects a message with a single field (excluding
+        - RAW: Expects a message with a single field (excluding
             attribute-related fields) whose contents are used as the raw bytes
             of the pubsub message.
-        - avro: Encodes records with a given avro schema, which may be inferred
+        - AVRO: Encodes records with a given Avro schema, which may be inferred
             from the input PCollection schema.
-        - json: Formats records with a given json schema, which may be inferred
+        - JSON: Formats records with a given JSON schema, which may be inferred
             from the input PCollection schema.
 
     schema: Schema specification for the given format.
diff --git a/sdks/python/apache_beam/yaml/yaml_io_test.py 
b/sdks/python/apache_beam/yaml/yaml_io_test.py
index 54fbac0fbeb..393e31de0e6 100644
--- a/sdks/python/apache_beam/yaml/yaml_io_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_io_test.py
@@ -95,7 +95,7 @@ class YamlPubSubTest(unittest.TestCase):
             type: ReadFromPubSub
             config:
               topic: my_topic
-              format: raw
+              format: RAW
             ''')
         assert_that(
             result,
@@ -115,7 +115,7 @@ class YamlPubSubTest(unittest.TestCase):
             type: ReadFromPubSub
             config:
               topic: my_topic
-              format: raw
+              format: RAW
               attributes: [attr]
             ''')
         assert_that(
@@ -139,7 +139,7 @@ class YamlPubSubTest(unittest.TestCase):
             type: ReadFromPubSub
             config:
               topic: my_topic
-              format: raw
+              format: RAW
               attributes_map: attrMap
             ''')
         assert_that(
@@ -163,7 +163,7 @@ class YamlPubSubTest(unittest.TestCase):
             type: ReadFromPubSub
             config:
               topic: my_topic
-              format: raw
+              format: RAW
               id_attribute: some_attr
             ''')
         assert_that(
@@ -203,7 +203,7 @@ class YamlPubSubTest(unittest.TestCase):
             type: ReadFromPubSub
             config:
               topic: my_topic
-              format: avro
+              format: AVRO
               schema: %s
             ''' % json.dumps(self._avro_schema))
         assert_that(
@@ -227,7 +227,7 @@ class YamlPubSubTest(unittest.TestCase):
             type: ReadFromPubSub
             config:
               topic: my_topic
-              format: json
+              format: JSON
               schema:
                 type: object
                 properties:
@@ -267,7 +267,7 @@ class YamlPubSubTest(unittest.TestCase):
             type: ReadFromPubSub
             config:
               topic: my_topic
-              format: json
+              format: JSON
               schema:
                 type: object
                 properties:
@@ -300,7 +300,7 @@ class YamlPubSubTest(unittest.TestCase):
               type: ReadFromPubSub
               config:
                 topic: my_topic
-                format: json
+                format: JSON
                 schema:
                   type: object
                   properties:
@@ -322,7 +322,7 @@ class YamlPubSubTest(unittest.TestCase):
             type: ReadFromPubSub
             config:
               topic: my_topic
-              format: json
+              format: JSON
               schema:
                 type: object
                 properties:
@@ -353,7 +353,7 @@ class YamlPubSubTest(unittest.TestCase):
             type: WriteToPubSub
             config:
               topic: my_topic
-              format: raw
+              format: RAW
             '''))
 
   def test_write_with_attribute(self):
@@ -374,7 +374,7 @@ class YamlPubSubTest(unittest.TestCase):
             type: WriteToPubSub
             config:
               topic: my_topic
-              format: raw
+              format: RAW
               attributes: [attr]
             '''))
 
@@ -396,7 +396,7 @@ class YamlPubSubTest(unittest.TestCase):
             type: WriteToPubSub
             config:
               topic: my_topic
-              format: raw
+              format: RAW
               attributes_map: attrMap
             '''))
 
@@ -415,7 +415,7 @@ class YamlPubSubTest(unittest.TestCase):
             type: WriteToPubSub
             config:
               topic: my_topic
-              format: raw
+              format: RAW
               id_attribute: some_attr
             '''))
 
@@ -438,7 +438,7 @@ class YamlPubSubTest(unittest.TestCase):
             type: WriteToPubSub
             config:
               topic: my_topic
-              format: avro
+              format: AVRO
             '''))
 
   def test_write_json(self):
@@ -463,7 +463,7 @@ class YamlPubSubTest(unittest.TestCase):
             type: WriteToPubSub
             config:
               topic: my_topic
-              format: json
+              format: JSON
               attributes: [label]
               attributes_map: other
             '''))
diff --git a/website/www/site/content/en/documentation/sdks/yaml.md 
b/website/www/site/content/en/documentation/sdks/yaml.md
index 5c34e993982..530ccf1177c 100644
--- a/website/www/site/content/en/documentation/sdks/yaml.md
+++ b/website/www/site/content/en/documentation/sdks/yaml.md
@@ -424,7 +424,7 @@ pipeline:
     - type: ReadFromPubSub
       config:
         topic: myPubSubTopic
-        format: json
+        format: JSON
         schema:
           type: object
           properties:
@@ -441,7 +441,7 @@ pipeline:
     - type: WriteToPubSub
       config:
         topic: anotherPubSubTopic
-        format: json
+        format: JSON
 options:
   streaming: true
 ```
@@ -469,7 +469,7 @@ pipeline:
     - type: WriteToPubSub
       config:
         topic: anotherPubSubTopic
-        format: json
+        format: JSON
 options:
   streaming: true
 ```
@@ -496,7 +496,7 @@ pipeline:
     - type: WriteToPubSub
       config:
         topic: anotherPubSubTopic
-        format: json
+        format: JSON
 options:
   streaming: true
 ```
@@ -556,7 +556,7 @@ pipeline:
     - type: WriteToPubSub
       config:
         topic: anotherPubSubTopic
-        format: json
+        format: JSON
 options:
   streaming: true
 ```
@@ -581,7 +581,7 @@ pipeline:
     - type: WriteToPubSub
       config:
         topic: anotherPubSubTopic
-        format: json
+        format: JSON
   windowing:
     type: fixed
     size: 60
@@ -700,7 +700,7 @@ pipeline:
     - type: WriteToPubSub
       config:
         topic: anotherPubSubTopic
-        format: json
+        format: JSON
 options:
   streaming: true
 ```

Reply via email to