This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a3f9e0e7d96 Add publish_time_field to ReadFromPubSub YAML transform 
(#38985)
a3f9e0e7d96 is described below

commit a3f9e0e7d96a0d667dd75494e2079e18e20cc821
Author: Lalit Yadav <[email protected]>
AuthorDate: Wed Jun 17 19:16:03 2026 -0500

    Add publish_time_field to ReadFromPubSub YAML transform (#38985)
    
    * Add publish_time_field to ReadFromPubSub YAML transform
    
    * Format YAML PubSub publish time test
    
    * Fix YAML PubSub publish time handling
---
 sdks/python/apache_beam/yaml/yaml_io.py      |  21 +++++-
 sdks/python/apache_beam/yaml/yaml_io_test.py | 103 +++++++++++++++++++++++++++
 2 files changed, 121 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/yaml/yaml_io.py 
b/sdks/python/apache_beam/yaml/yaml_io.py
index 989661a6eae..77cbc41def3 100644
--- a/sdks/python/apache_beam/yaml/yaml_io.py
+++ b/sdks/python/apache_beam/yaml/yaml_io.py
@@ -46,6 +46,7 @@ from apache_beam.io.filesystem import CompressionTypes
 from apache_beam.io.gcp.bigquery import BigQueryDisposition
 from apache_beam.portability.api import schema_pb2
 from apache_beam.typehints import schemas
+from apache_beam.utils.timestamp import Timestamp
 from apache_beam.yaml import json_utils
 from apache_beam.yaml import yaml_errors
 from apache_beam.yaml import yaml_provider
@@ -316,7 +317,8 @@ def read_from_pubsub(
     attributes: Optional[Iterable[str]] = None,
     attributes_map: Optional[str] = None,
     id_attribute: Optional[str] = None,
-    timestamp_attribute: Optional[str] = None):
+    timestamp_attribute: Optional[str] = None,
+    publish_time_field: Optional[str] = None):
   """Reads messages from Cloud Pub/Sub.
 
   Args:
@@ -366,14 +368,19 @@ def read_from_pubsub(
         ``2015-10-29T23:41:41.123Z``. The sub-second component of the
         timestamp is optional, and digits beyond the first three (i.e., time
         units smaller than milliseconds) may be ignored.
+    publish_time_field: Field to add to output messages with the Pub/Sub
+      message publish time. If None, no such field is added.
   """
   if topic and subscription:
     raise TypeError('Only one of topic and subscription may be specified.')
   elif not topic and not subscription:
     raise TypeError('One of topic or subscription may be specified.')
+  if publish_time_field is not None and not publish_time_field.strip():
+    raise ValueError('publish_time_field must be a non-empty field name.')
+  has_publish_time_field = publish_time_field is not None
   payload_schema, parser = _create_parser(format, schema)
   extra_fields: list[schema_pb2.Field] = []
-  if not attributes and not attributes_map:
+  if not attributes and not attributes_map and not has_publish_time_field:
     mapper = lambda msg: parser(msg)
   else:
     if isinstance(attributes, str):
@@ -384,6 +391,9 @@ def read_from_pubsub(
     if attributes_map:
       extra_fields.append(
           schemas.schema_field(attributes_map, Mapping[str, str]))
+    if has_publish_time_field:
+      extra_fields.append(
+          schemas.schema_field(publish_time_field, Optional[Timestamp]))
 
     def mapper(msg):
       values = parser(msg.data).as_dict()
@@ -393,6 +403,10 @@ def read_from_pubsub(
           values[attr] = msg.attributes[attr]
       if attributes_map:
         values[attributes_map] = msg.attributes
+      if has_publish_time_field:
+        values[publish_time_field] = (
+            Timestamp.of(msg.publish_time)
+            if msg.publish_time is not None else None)
       return beam.Row(**values)
 
   output = (
@@ -400,7 +414,8 @@ def read_from_pubsub(
       | beam.io.ReadFromPubSub(
           topic=topic,
           subscription=subscription,
-          with_attributes=bool(attributes or attributes_map),
+          with_attributes=bool(
+              attributes or attributes_map or has_publish_time_field),
           id_label=id_attribute,
           timestamp_attribute=timestamp_attribute)
       | 'ParseMessage' >> beam.Map(mapper))
diff --git a/sdks/python/apache_beam/yaml/yaml_io_test.py 
b/sdks/python/apache_beam/yaml/yaml_io_test.py
index e6219277bf5..250a54689f5 100644
--- a/sdks/python/apache_beam/yaml/yaml_io_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_io_test.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 
+import datetime
 import io
 import json
 import logging
@@ -32,6 +33,7 @@ from apache_beam.testing.util import AssertThat
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 from apache_beam.typehints import schemas as schema_utils
+from apache_beam.utils.timestamp import Timestamp
 from apache_beam.yaml.yaml_transform import YamlTransform
 
 try:
@@ -181,6 +183,107 @@ class YamlPubSubTest(unittest.TestCase):
                 beam.Row(payload=b'msg2', attrMap={'attr': 'value2'})
             ]))
 
+  def test_read_with_publish_time_field(self):
+    publish_time_1 = datetime.datetime(
+        2018, 3, 12, 13, 37, 1, 234567, tzinfo=datetime.timezone.utc)
+    publish_time_2 = datetime.datetime(
+        2018, 3, 12, 13, 38, 2, 345678, tzinfo=datetime.timezone.utc)
+    publish_time_3 = Timestamp.from_utc_datetime(
+        datetime.datetime(
+            2018, 3, 12, 13, 39, 3, 456789, tzinfo=datetime.timezone.utc))
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      with mock.patch('apache_beam.io.ReadFromPubSub',
+                      FakeReadFromPubSub(
+                          topic='my_topic',
+                          messages=[PubsubMessage(b'msg1', {'attr': 'value1'},
+                                                  publish_time=publish_time_1),
+                                    PubsubMessage(b'msg2', {'attr': 'value2'},
+                                                  publish_time=publish_time_2),
+                                    PubsubMessage(b'msg3', {'attr': 'value3'},
+                                                  publish_time=publish_time_3),
+                                    PubsubMessage(b'msg4',
+                                                  {'attr': 'value4'})])):
+        result = p | YamlTransform(
+            '''
+            type: ReadFromPubSub
+            config:
+              topic: my_topic
+              format: RAW
+              publish_time_field: publish_time
+            ''')
+        assert_that(
+            result,
+            equal_to([
+                beam.Row(
+                    payload=b'msg1',
+                    publish_time=Timestamp.from_utc_datetime(publish_time_1)),
+                beam.Row(
+                    payload=b'msg2',
+                    publish_time=Timestamp.from_utc_datetime(publish_time_2)),
+                beam.Row(payload=b'msg3', publish_time=publish_time_3),
+                beam.Row(payload=b'msg4', publish_time=None)
+            ]))
+
+  def test_read_with_attributes_and_publish_time_field(self):
+    publish_time_1 = Timestamp.from_utc_datetime(
+        datetime.datetime(
+            2018, 3, 12, 13, 37, 1, 234567, tzinfo=datetime.timezone.utc))
+    publish_time_2 = Timestamp.from_utc_datetime(
+        datetime.datetime(
+            2018, 3, 12, 13, 38, 2, 345678, tzinfo=datetime.timezone.utc))
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      with mock.patch('apache_beam.io.ReadFromPubSub',
+                      FakeReadFromPubSub(
+                          topic='my_topic',
+                          messages=[PubsubMessage(b'msg1', {'attr': 'value1'},
+                                                  publish_time=publish_time_1),
+                                    PubsubMessage(b'msg2', {'attr': 'value2'},
+                                                  publish_time=publish_time_2)
+                                    ])):
+        result = p | YamlTransform(
+            '''
+            type: ReadFromPubSub
+            config:
+              topic: my_topic
+              format: RAW
+              attributes: [attr]
+              attributes_map: attrMap
+              publish_time_field: publish_time
+            ''')
+        assert_that(
+            result,
+            equal_to([
+                beam.Row(
+                    payload=b'msg1',
+                    attr='value1',
+                    attrMap={'attr': 'value1'},
+                    publish_time=publish_time_1),
+                beam.Row(
+                    payload=b'msg2',
+                    attr='value2',
+                    attrMap={'attr': 'value2'},
+                    publish_time=publish_time_2)
+            ]))
+
+  def test_read_with_empty_publish_time_field(self):
+    for publish_time_field in ('', '   '):
+      with self.subTest(publish_time_field=publish_time_field):
+        with self.assertRaisesRegex(
+            ValueError, 'publish_time_field must be a non-empty field name'):
+          with beam.Pipeline(
+              options=beam.options.pipeline_options.PipelineOptions(
+                  pickle_library='cloudpickle')) as p:
+            _ = p | YamlTransform(
+                '''
+                type: ReadFromPubSub
+                config:
+                  topic: my_topic
+                  format: RAW
+                  publish_time_field: "%s"
+                ''' % publish_time_field)
+
   def test_read_with_id_attribute(self):
     with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
         pickle_library='cloudpickle')) as p:

Reply via email to