[ https://issues.apache.org/jira/browse/BEAM-3744?focusedWorklogId=126346&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-126346 ]
ASF GitHub Bot logged work on BEAM-3744: ---------------------------------------- Author: ASF GitHub Bot Created on: 23/Jul/18 23:15 Start Date: 23/Jul/18 23:15 Worklog Time Spent: 10m Work Description: udim commented on a change in pull request #5952: [BEAM-3744] Python PubSub API Fixes and Tests URL: https://github.com/apache/beam/pull/5952#discussion_r204581110 ########## File path: sdks/python/apache_beam/io/gcp/pubsub.py ########## @@ -155,45 +175,94 @@ def to_runner_api_parameter(self, context): return self.to_runner_api_pickled(context) -class ReadStringsFromPubSub(PTransform): - """A ``PTransform`` for reading utf-8 string payloads from Cloud Pub/Sub. - - Outputs elements of type ``unicode``, decoded from UTF-8. +@deprecated(since='2.6.0', extra_message='Use ReadFromPubSub instead.') +def ReadStringsFromPubSub(topic=None, subscription=None, id_label=None): + return _ReadStringsFromPubSub(topic, subscription, id_label) - This class is deprecated. - """ +class _ReadStringsFromPubSub(PTransform): + """This class is deprecated. Use ``ReadFromPubSub`` instead.""" def __init__(self, topic=None, subscription=None, id_label=None): - super(ReadStringsFromPubSub, self).__init__() + super(_ReadStringsFromPubSub, self).__init__() self.topic = topic self.subscription = subscription self.id_label = id_label def expand(self, pvalue): p = (pvalue.pipeline - | ReadFromPubSub(self.topic, self.subscription, self.id_label) + | ReadFromPubSub(self.topic, self.subscription, self.id_label, + with_attributes=False) | 'DecodeString' >> Map(lambda b: b.decode('utf-8'))) p.element_type = basestring return p -class WriteStringsToPubSub(PTransform): - """A ``PTransform`` for writing utf-8 string payloads to Cloud Pub/Sub.""" +@deprecated(since='2.6.0', extra_message='Use WriteToPubSub instead.') +def WriteStringsToPubSub(topic): + return _WriteStringsToPubSub(topic) + + +class _WriteStringsToPubSub(PTransform): + """This class is deprecated. Use ``WriteToPubSub`` instead.""" def __init__(self, topic): - """Initializes ``WriteStringsToPubSub``. + """Initializes ``_WriteStringsToPubSub``. Attributes: topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>". """ - super(WriteStringsToPubSub, self).__init__() - self._sink = _PubSubPayloadSink(topic) + super(_WriteStringsToPubSub, self).__init__() + self._sink = _PubSubSink(topic, id_label=None, with_attributes=False, + timestamp_attribute=None) def expand(self, pcoll): pcoll = pcoll | 'EncodeString' >> Map(lambda s: s.encode('utf-8')) pcoll.element_type = bytes return pcoll | Write(self._sink) + +class WriteToPubSub(PTransform): + """A ``PTransform`` for writing messages to Cloud Pub/Sub.""" + # Implementation note: This ``PTransform`` is overridden by Directrunner. + + def __init__(self, topic, with_attributes, id_label=None, + timestamp_attribute=None): + """Initializes ``WriteToPubSub``. + + Args: + topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>". + with_attributes: + True - input elements will be :class:`~PubsubMessage` objects. + False - input elements will be of type ``six.binary_type`` (message + data only). + id_label: If set, will set an attribute for each Cloud Pub/Sub message + with the given name and a unique value. This attribute can then be used + in a ReadFromPubSub PTransform to deduplicate messages. + timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub + message with the given name and the message's publish time as the value. + """ + super(WriteToPubSub, self).__init__() + self.with_attributes = with_attributes + self.id_label = id_label + self.timestamp_attribute = timestamp_attribute + self._sink = _PubSubSink(topic, id_label, with_attributes, + timestamp_attribute) + + @staticmethod + def to_proto(element): + if not isinstance(element, PubsubMessage): + raise TypeError('Unexpected element. Type: %s (expected: PubsubMessage), ' + 'value: %r' % (type(element), element)) + return element._to_proto() + + def expand(self, pcoll): + if self.with_attributes: + pcoll = pcoll | 'ToProtobuf' >> Map(self.to_proto) + pcoll.element_type = six.binary_type Review comment: After some investigation I believe that this is correct. The output of this mapping is a string (uses the proto object's `SerializeToString()`). On Dataflow, this string is passed to a runner harness written in Java, so it must be a serialized protobuf to be understood. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 126346) Time Spent: 9h 40m (was: 9.5h) > Support full PubsubMessages > --------------------------- > > Key: BEAM-3744 > URL: https://issues.apache.org/jira/browse/BEAM-3744 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Reporter: Udi Meiri > Assignee: Udi Meiri > Priority: Critical > Time Spent: 9h 40m > Remaining Estimate: 0h > > Tracking changes to Pubsub support in Python SDK. -- This message was sent by Atlassian JIRA (v7.6.3#76005)