Add coder info to pubsub io
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b5852d21 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b5852d21 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b5852d21 Branch: refs/heads/gearpump-runner Commit: b5852d212cab060321c43a5800f8585aa3649aec Parents: 0a0a1bc Author: Vikas Kedigehalli <vika...@google.com> Authored: Wed Jun 7 16:28:18 2017 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Wed Jun 7 22:55:00 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/gcp/pubsub.py | 32 +++++++++++++++----- sdks/python/apache_beam/io/gcp/pubsub_test.py | 28 +++++++++++++++-- .../runners/dataflow/dataflow_runner.py | 23 ++++++++++---- 3 files changed, 67 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b5852d21/sdks/python/apache_beam/io/gcp/pubsub.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 1ba8ac0..40326e1 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -40,13 +40,15 @@ __all__ = ['ReadStringsFromPubSub', 'WriteStringsToPubSub', class ReadStringsFromPubSub(PTransform): """A ``PTransform`` for reading utf-8 string payloads from Cloud Pub/Sub.""" - def __init__(self, topic, subscription=None, id_label=None): + def __init__(self, topic=None, subscription=None, id_label=None): """Initializes ``ReadStringsFromPubSub``. Attributes: - topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>". - subscription: Optional existing Cloud Pub/Sub subscription to use in the - form "projects/<project>/subscriptions/<subscription>". + topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>". If + provided then subscription must be None. + subscription: Existing Cloud Pub/Sub subscription to use in the + form "projects/<project>/subscriptions/<subscription>". If provided then + topic must be None. id_label: The attribute on incoming Pub/Sub messages to use as a unique record identifier. When specified, the value of this attribute (which can be any string that uniquely identifies the record) will be used for @@ -55,6 +57,12 @@ class ReadStringsFromPubSub(PTransform): case, deduplication of the stream will be strictly best effort. """ super(ReadStringsFromPubSub, self).__init__() + if topic and subscription: + raise ValueError("Only one of topic or subscription should be provided.") + + if not (topic or subscription): + raise ValueError("Either a topic or subscription must be provided.") + self._source = _PubSubPayloadSource( topic, subscription=subscription, @@ -90,9 +98,11 @@ class _PubSubPayloadSource(dataflow_io.NativeSource): """Source for the payload of a message as bytes from a Cloud Pub/Sub topic. Attributes: - topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>". - subscription: Optional existing Cloud Pub/Sub subscription to use in the - form "projects/<project>/subscriptions/<subscription>". + topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>". If + provided then topic must be None. + subscription: Existing Cloud Pub/Sub subscription to use in the + form "projects/<project>/subscriptions/<subscription>". If provided then + subscription must be None. id_label: The attribute on incoming Pub/Sub messages to use as a unique record identifier. When specified, the value of this attribute (which can be any string that uniquely identifies the record) will be used for @@ -101,7 +111,10 @@ class _PubSubPayloadSource(dataflow_io.NativeSource): case, deduplication of the stream will be strictly best effort. """ - def __init__(self, topic, subscription=None, id_label=None): + def __init__(self, topic=None, subscription=None, id_label=None): + # we are using this coder explicitly for portability reasons of PubsubIO + # across implementations in languages. + self.coder = coders.BytesCoder() self.topic = topic self.subscription = subscription self.id_label = id_label @@ -131,6 +144,9 @@ class _PubSubPayloadSink(dataflow_io.NativeSink): """Sink for the payload of a message as bytes to a Cloud Pub/Sub topic.""" def __init__(self, topic): + # we are using this coder explicitly for portability reasons of PubsubIO + # across implementations in languages. + self.coder = coders.BytesCoder() self.topic = topic @property http://git-wip-us.apache.org/repos/asf/beam/blob/b5852d21/sdks/python/apache_beam/io/gcp/pubsub_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index 322d08a..cf14e8c 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -34,9 +34,9 @@ from apache_beam.transforms.display_test import DisplayDataItemMatcher class TestReadStringsFromPubSub(unittest.TestCase): - def test_expand(self): + def test_expand_with_topic(self): p = TestPipeline() - pcoll = p | ReadStringsFromPubSub('a_topic', 'a_subscription', 'a_label') + pcoll = p | ReadStringsFromPubSub('a_topic', None, 'a_label') # Ensure that the output type is str self.assertEqual(unicode, pcoll.element_type) @@ -47,9 +47,33 @@ class TestReadStringsFromPubSub(unittest.TestCase): # Ensure that the properties passed through correctly source = read_pcoll.producer.transform.source self.assertEqual('a_topic', source.topic) + self.assertEqual('a_label', source.id_label) + + def test_expand_with_subscription(self): + p = TestPipeline() + pcoll = p | ReadStringsFromPubSub(None, 'a_subscription', 'a_label') + # Ensure that the output type is str + self.assertEqual(unicode, pcoll.element_type) + + # Ensure that the type on the intermediate read output PCollection is bytes + read_pcoll = pcoll.producer.inputs[0] + self.assertEqual(bytes, read_pcoll.element_type) + + # Ensure that the properties passed through correctly + source = read_pcoll.producer.transform.source self.assertEqual('a_subscription', source.subscription) self.assertEqual('a_label', source.id_label) + def test_expand_with_both_topic_and_subscription(self): + with self.assertRaisesRegexp( + ValueError, "Only one of topic or subscription should be provided."): + ReadStringsFromPubSub('a_topic', 'a_subscription', 'a_label') + + def test_expand_with_no_topic_or_subscription(self): + with self.assertRaisesRegexp( + ValueError, "Either a topic or subscription must be provided."): + ReadStringsFromPubSub(None, None, 'a_label') + class TestWriteStringsToPubSub(unittest.TestCase): def test_expand(self): http://git-wip-us.apache.org/repos/asf/beam/blob/b5852d21/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 3fc8983..d9aa1bf 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -618,10 +618,12 @@ class DataflowRunner(PipelineRunner): if not standard_options.streaming: raise ValueError('PubSubPayloadSource is currently available for use ' 'only in streaming pipelines.') - step.add_property(PropertyNames.PUBSUB_TOPIC, transform.source.topic) - if transform.source.subscription: + # Only one of topic or subscription should be set. + if transform.source.topic: + step.add_property(PropertyNames.PUBSUB_TOPIC, transform.source.topic) + elif transform.source.subscription: step.add_property(PropertyNames.PUBSUB_SUBSCRIPTION, - transform.source.topic) + transform.source.subscription) if transform.source.id_label: step.add_property(PropertyNames.PUBSUB_ID_LABEL, transform.source.id_label) @@ -639,7 +641,12 @@ class DataflowRunner(PipelineRunner): # step should be the type of value outputted by each step. Read steps # automatically wrap output values in a WindowedValue wrapper, if necessary. # This is also necessary for proper encoding for size estimation. - coder = coders.WindowedValueCoder(transform._infer_output_coder()) # pylint: disable=protected-access + # Using a GlobalWindowCoder as a place holder instead of the default + # PickleCoder because GlobalWindowCoder is known coder. + # TODO(robertwb): Query the collection for the windowfn to extract the + # correct coder. + coder = coders.WindowedValueCoder(transform._infer_output_coder(), + coders.coders.GlobalWindowCoder()) # pylint: disable=protected-access step.encoding = self._get_cloud_encoding(coder) step.add_property( @@ -708,8 +715,12 @@ class DataflowRunner(PipelineRunner): step.add_property(PropertyNames.FORMAT, transform.sink.format) # Wrap coder in WindowedValueCoder: this is necessary for proper encoding - # for size estimation. - coder = coders.WindowedValueCoder(transform.sink.coder) + # for size estimation. Using a GlobalWindowCoder as a place holder instead + # of the default PickleCoder because GlobalWindowCoder is known coder. + # TODO(robertwb): Query the collection for the windowfn to extract the + # correct coder. + coder = coders.WindowedValueCoder(transform.sink.coder, + coders.coders.GlobalWindowCoder()) step.encoding = self._get_cloud_encoding(coder) step.add_property(PropertyNames.ENCODING, step.encoding) step.add_property(