This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a3f9e0e7d96 Add publish_time_field to ReadFromPubSub YAML transform
(#38985)
a3f9e0e7d96 is described below
commit a3f9e0e7d96a0d667dd75494e2079e18e20cc821
Author: Lalit Yadav <[email protected]>
AuthorDate: Wed Jun 17 19:16:03 2026 -0500
Add publish_time_field to ReadFromPubSub YAML transform (#38985)
* Add publish_time_field to ReadFromPubSub YAML transform
* Format YAML PubSub publish time test
* Fix YAML PubSub publish time handling
---
sdks/python/apache_beam/yaml/yaml_io.py | 21 +++++-
sdks/python/apache_beam/yaml/yaml_io_test.py | 103 +++++++++++++++++++++++++++
2 files changed, 121 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/yaml/yaml_io.py
b/sdks/python/apache_beam/yaml/yaml_io.py
index 989661a6eae..77cbc41def3 100644
--- a/sdks/python/apache_beam/yaml/yaml_io.py
+++ b/sdks/python/apache_beam/yaml/yaml_io.py
@@ -46,6 +46,7 @@ from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.gcp.bigquery import BigQueryDisposition
from apache_beam.portability.api import schema_pb2
from apache_beam.typehints import schemas
+from apache_beam.utils.timestamp import Timestamp
from apache_beam.yaml import json_utils
from apache_beam.yaml import yaml_errors
from apache_beam.yaml import yaml_provider
@@ -316,7 +317,8 @@ def read_from_pubsub(
attributes: Optional[Iterable[str]] = None,
attributes_map: Optional[str] = None,
id_attribute: Optional[str] = None,
- timestamp_attribute: Optional[str] = None):
+ timestamp_attribute: Optional[str] = None,
+ publish_time_field: Optional[str] = None):
"""Reads messages from Cloud Pub/Sub.
Args:
@@ -366,14 +368,19 @@ def read_from_pubsub(
``2015-10-29T23:41:41.123Z``. The sub-second component of the
timestamp is optional, and digits beyond the first three (i.e., time
units smaller than milliseconds) may be ignored.
+ publish_time_field: Field to add to output messages with the Pub/Sub
+ message publish time. If None, no such field is added.
"""
if topic and subscription:
raise TypeError('Only one of topic and subscription may be specified.')
elif not topic and not subscription:
raise TypeError('One of topic or subscription may be specified.')
+ if publish_time_field is not None and not publish_time_field.strip():
+ raise ValueError('publish_time_field must be a non-empty field name.')
+ has_publish_time_field = publish_time_field is not None
payload_schema, parser = _create_parser(format, schema)
extra_fields: list[schema_pb2.Field] = []
- if not attributes and not attributes_map:
+ if not attributes and not attributes_map and not has_publish_time_field:
mapper = lambda msg: parser(msg)
else:
if isinstance(attributes, str):
@@ -384,6 +391,9 @@ def read_from_pubsub(
if attributes_map:
extra_fields.append(
schemas.schema_field(attributes_map, Mapping[str, str]))
+ if has_publish_time_field:
+ extra_fields.append(
+ schemas.schema_field(publish_time_field, Optional[Timestamp]))
def mapper(msg):
values = parser(msg.data).as_dict()
@@ -393,6 +403,10 @@ def read_from_pubsub(
values[attr] = msg.attributes[attr]
if attributes_map:
values[attributes_map] = msg.attributes
+ if has_publish_time_field:
+ values[publish_time_field] = (
+ Timestamp.of(msg.publish_time)
+ if msg.publish_time is not None else None)
return beam.Row(**values)
output = (
@@ -400,7 +414,8 @@ def read_from_pubsub(
| beam.io.ReadFromPubSub(
topic=topic,
subscription=subscription,
- with_attributes=bool(attributes or attributes_map),
+ with_attributes=bool(
+ attributes or attributes_map or has_publish_time_field),
id_label=id_attribute,
timestamp_attribute=timestamp_attribute)
| 'ParseMessage' >> beam.Map(mapper))
diff --git a/sdks/python/apache_beam/yaml/yaml_io_test.py
b/sdks/python/apache_beam/yaml/yaml_io_test.py
index e6219277bf5..250a54689f5 100644
--- a/sdks/python/apache_beam/yaml/yaml_io_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_io_test.py
@@ -15,6 +15,7 @@
# limitations under the License.
#
+import datetime
import io
import json
import logging
@@ -32,6 +33,7 @@ from apache_beam.testing.util import AssertThat
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.typehints import schemas as schema_utils
+from apache_beam.utils.timestamp import Timestamp
from apache_beam.yaml.yaml_transform import YamlTransform
try:
@@ -181,6 +183,107 @@ class YamlPubSubTest(unittest.TestCase):
beam.Row(payload=b'msg2', attrMap={'attr': 'value2'})
]))
+ def test_read_with_publish_time_field(self):
+ publish_time_1 = datetime.datetime(
+ 2018, 3, 12, 13, 37, 1, 234567, tzinfo=datetime.timezone.utc)
+ publish_time_2 = datetime.datetime(
+ 2018, 3, 12, 13, 38, 2, 345678, tzinfo=datetime.timezone.utc)
+ publish_time_3 = Timestamp.from_utc_datetime(
+ datetime.datetime(
+ 2018, 3, 12, 13, 39, 3, 456789, tzinfo=datetime.timezone.utc))
+ with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+ pickle_library='cloudpickle')) as p:
+ with mock.patch('apache_beam.io.ReadFromPubSub',
+ FakeReadFromPubSub(
+ topic='my_topic',
+ messages=[PubsubMessage(b'msg1', {'attr': 'value1'},
+ publish_time=publish_time_1),
+ PubsubMessage(b'msg2', {'attr': 'value2'},
+ publish_time=publish_time_2),
+ PubsubMessage(b'msg3', {'attr': 'value3'},
+ publish_time=publish_time_3),
+ PubsubMessage(b'msg4',
+ {'attr': 'value4'})])):
+ result = p | YamlTransform(
+ '''
+ type: ReadFromPubSub
+ config:
+ topic: my_topic
+ format: RAW
+ publish_time_field: publish_time
+ ''')
+ assert_that(
+ result,
+ equal_to([
+ beam.Row(
+ payload=b'msg1',
+ publish_time=Timestamp.from_utc_datetime(publish_time_1)),
+ beam.Row(
+ payload=b'msg2',
+ publish_time=Timestamp.from_utc_datetime(publish_time_2)),
+ beam.Row(payload=b'msg3', publish_time=publish_time_3),
+ beam.Row(payload=b'msg4', publish_time=None)
+ ]))
+
+ def test_read_with_attributes_and_publish_time_field(self):
+ publish_time_1 = Timestamp.from_utc_datetime(
+ datetime.datetime(
+ 2018, 3, 12, 13, 37, 1, 234567, tzinfo=datetime.timezone.utc))
+ publish_time_2 = Timestamp.from_utc_datetime(
+ datetime.datetime(
+ 2018, 3, 12, 13, 38, 2, 345678, tzinfo=datetime.timezone.utc))
+ with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+ pickle_library='cloudpickle')) as p:
+ with mock.patch('apache_beam.io.ReadFromPubSub',
+ FakeReadFromPubSub(
+ topic='my_topic',
+ messages=[PubsubMessage(b'msg1', {'attr': 'value1'},
+ publish_time=publish_time_1),
+ PubsubMessage(b'msg2', {'attr': 'value2'},
+ publish_time=publish_time_2)
+ ])):
+ result = p | YamlTransform(
+ '''
+ type: ReadFromPubSub
+ config:
+ topic: my_topic
+ format: RAW
+ attributes: [attr]
+ attributes_map: attrMap
+ publish_time_field: publish_time
+ ''')
+ assert_that(
+ result,
+ equal_to([
+ beam.Row(
+ payload=b'msg1',
+ attr='value1',
+ attrMap={'attr': 'value1'},
+ publish_time=publish_time_1),
+ beam.Row(
+ payload=b'msg2',
+ attr='value2',
+ attrMap={'attr': 'value2'},
+ publish_time=publish_time_2)
+ ]))
+
+ def test_read_with_empty_publish_time_field(self):
+ for publish_time_field in ('', ' '):
+ with self.subTest(publish_time_field=publish_time_field):
+ with self.assertRaisesRegex(
+ ValueError, 'publish_time_field must be a non-empty field name'):
+ with beam.Pipeline(
+ options=beam.options.pipeline_options.PipelineOptions(
+ pickle_library='cloudpickle')) as p:
+ _ = p | YamlTransform(
+ '''
+ type: ReadFromPubSub
+ config:
+ topic: my_topic
+ format: RAW
+ publish_time_field: "%s"
+ ''' % publish_time_field)
+
def test_read_with_id_attribute(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p: