Repository: incubator-beam Updated Branches: refs/heads/python-sdk cf026bb55 -> ea642428f
Adding display data to sink, sources, and parallel-do operations. - PubSub, BigQuery, NativeFile, FileBasedSource - Write, Read, ParDo transforms - DoFn Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3c1043ae Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3c1043ae Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3c1043ae Branch: refs/heads/python-sdk Commit: 3c1043aeadf7a8ce08c174b76fa975e0bf78bae8 Parents: cf026bb Author: Pablo <pabl...@google.com> Authored: Thu Oct 27 12:07:02 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Wed Nov 9 11:59:47 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/io/bigquery.py | 32 +++++ sdks/python/apache_beam/io/bigquery_test.py | 68 ++++++++- sdks/python/apache_beam/io/filebasedsource.py | 6 + .../apache_beam/io/filebasedsource_test.py | 44 ++++-- sdks/python/apache_beam/io/fileio.py | 20 +++ sdks/python/apache_beam/io/fileio_test.py | 62 ++++++++- sdks/python/apache_beam/io/iobase.py | 12 +- sdks/python/apache_beam/io/pubsub.py | 15 ++ sdks/python/apache_beam/io/pubsub_test.py | 62 +++++++++ .../runners/dataflow/native_io/iobase.py | 5 +- sdks/python/apache_beam/transforms/core.py | 24 +++- sdks/python/apache_beam/transforms/display.py | 72 +++++++--- .../apache_beam/transforms/display_test.py | 138 ++++++++++++++----- .../apache_beam/transforms/ptransform_test.py | 58 ++++++++ 14 files changed, 548 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/io/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index 9c1ee27..f0e88a6 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -118,6 +118,7 @@ from apache_beam.internal import auth from apache_beam.internal.json_value import from_json_value from apache_beam.internal.json_value import to_json_value from apache_beam.runners.dataflow.native_io import iobase as dataflow_io +from apache_beam.transforms.display import DisplayDataItem from apache_beam.utils import retry from apache_beam.utils.options import GoogleCloudOptions @@ -342,6 +343,23 @@ class BigQuerySource(dataflow_io.NativeSource): self.validate = validate self.coder = coder or RowAsDictJsonCoder() + def display_data(self): + if self.query is not None: + res = {'query': DisplayDataItem(self.query, label='Query')} + else: + if self.table_reference.projectId is not None: + tableSpec = '{}:{}.{}'.format(self.table_reference.projectId, + self.table_reference.datasetId, + self.table_reference.tableId) + else: + tableSpec = '{}.{}'.format(self.table_reference.datasetId, + self.table_reference.tableId) + res = {'table': DisplayDataItem(tableSpec, label='Table')} + + res['validation'] = DisplayDataItem(self.validate, + label='Validation Enabled') + return res + @property def format(self): """Source format name required for remote execution.""" @@ -434,6 +452,20 @@ class BigQuerySink(dataflow_io.NativeSink): self.validate = validate self.coder = coder or RowAsDictJsonCoder() + def display_data(self): + res = {} + if self.table_reference is not None: + tableSpec = '{}.{}'.format(self.table_reference.datasetId, + self.table_reference.tableId) + if self.table_reference.projectId is not None: + tableSpec = '{}:{}'.format(self.table_reference.projectId, + tableSpec) + res['table'] = DisplayDataItem(tableSpec, label='Table') + + res['validation'] = DisplayDataItem(self.validate, + label="Validation Enabled") + return res + def schema_as_json(self): """Returns the TableSchema associated with the sink as a JSON string.""" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/io/bigquery_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/bigquery_test.py b/sdks/python/apache_beam/io/bigquery_test.py index 86b599c..b0c3bbe 100644 --- a/sdks/python/apache_beam/io/bigquery_test.py +++ b/sdks/python/apache_beam/io/bigquery_test.py @@ -24,6 +24,7 @@ import datetime import unittest from apitools.base.py.exceptions import HttpError +import hamcrest as hc import mock import apache_beam as beam @@ -31,6 +32,8 @@ from apache_beam.internal.clients import bigquery from apache_beam.internal.json_value import to_json_value from apache_beam.io.bigquery import RowAsDictJsonCoder from apache_beam.io.bigquery import TableRowJsonCoder +from apache_beam.transforms.display import DisplayData +from apache_beam.transforms.display_test import DisplayDataItemMatcher from apache_beam.utils.options import PipelineOptions @@ -112,6 +115,39 @@ class TestTableRowJsonCoder(unittest.TestCase): class TestBigQuerySource(unittest.TestCase): + def test_display_data_item_on_validate_true(self): + source = beam.io.BigQuerySource('dataset.table', validate=True) + + dd = DisplayData.create_from(source) + expected_items = [ + DisplayDataItemMatcher('validation', True), + DisplayDataItemMatcher('table', 'dataset.table')] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + + def test_table_reference_display_data(self): + source = beam.io.BigQuerySource('dataset.table') + dd = DisplayData.create_from(source) + expected_items = [ + DisplayDataItemMatcher('validation', False), + DisplayDataItemMatcher('table', 'dataset.table')] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + + source = beam.io.BigQuerySource('project:dataset.table') + dd = DisplayData.create_from(source) + expected_items = [ + DisplayDataItemMatcher('validation', False), + DisplayDataItemMatcher('table', 'project:dataset.table')] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + + source = beam.io.BigQuerySource('xyz.com:project:dataset.table') + dd = DisplayData.create_from(source) + expected_items = [ + DisplayDataItemMatcher('validation', + False), + DisplayDataItemMatcher('table', + 'xyz.com:project:dataset.table')] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + def test_parse_table_reference(self): source = beam.io.BigQuerySource('dataset.table') self.assertEqual(source.table_reference.datasetId, 'dataset') @@ -127,20 +163,40 @@ class TestBigQuerySource(unittest.TestCase): self.assertEqual(source.table_reference.datasetId, 'dataset') self.assertEqual(source.table_reference.tableId, 'table') - def test_specify_query_without_table(self): source = beam.io.BigQuerySource(query='my_query') self.assertEqual(source.query, 'my_query') self.assertIsNone(source.table_reference) self.assertTrue(source.use_legacy_sql) + def test_query_only_display_data(self): + source = beam.io.BigQuerySource(query='my_query') + dd = DisplayData.create_from(source) + expected_items = [ + DisplayDataItemMatcher('validation', False), + DisplayDataItemMatcher('query', 'my_query')] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + def test_specify_query_sql_format(self): source = beam.io.BigQuerySource(query='my_query', use_legacy_sql=False) self.assertEqual(source.query, 'my_query') self.assertFalse(source.use_legacy_sql) + def test_specify_query_without_table(self): + source = beam.io.BigQuerySource(query='my_query') + self.assertEqual(source.query, 'my_query') + self.assertIsNone(source.table_reference) + class TestBigQuerySink(unittest.TestCase): + def test_table_spec_display_data(self): + sink = beam.io.BigQuerySink('dataset.table') + dd = DisplayData.create_from(sink) + expected_items = [ + DisplayDataItemMatcher('table', 'dataset.table'), + DisplayDataItemMatcher('validation', False)] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + def test_parse_schema_descriptor(self): sink = beam.io.BigQuerySink( 'dataset.table', schema='s:STRING, n:INTEGER') @@ -150,9 +206,17 @@ class TestBigQuerySink(unittest.TestCase): field.name: field.type for field in sink.table_schema.fields} self.assertEqual({'n': 'INTEGER', 's': 'STRING'}, result_schema) + def test_project_table_display_data(self): + sinkq = beam.io.BigQuerySink('PROJECT:dataset.table') + dd = DisplayData.create_from(sinkq) + expected_items = [ + DisplayDataItemMatcher('table', 'PROJECT:dataset.table'), + DisplayDataItemMatcher('validation', False)] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + def test_simple_schema_as_json(self): sink = beam.io.BigQuerySink( - 'dataset.table', schema='s:STRING, n:INTEGER') + 'PROJECT:dataset.table', schema='s:STRING, n:INTEGER') self.assertEqual( json.dumps({'fields': [ {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'}, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/io/filebasedsource.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index d9b907c..58ad118 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -35,6 +35,7 @@ from apache_beam.io import concat_source from apache_beam.io import fileio from apache_beam.io import iobase from apache_beam.io import range_trackers +from apache_beam.transforms.display import DisplayDataItem MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25 @@ -91,6 +92,11 @@ class FileBasedSource(iobase.BoundedSource): # We can't split compressed files efficiently so turn off splitting. self._splittable = False + def display_data(self): + return {'filePattern': DisplayDataItem(self._pattern, label="File Pattern"), + 'compression': DisplayDataItem(str(self._compression_type), + label='Compression Type')} + def _get_concat_source(self): if self._concat_source is None: single_file_sources = [] http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/io/filebasedsource_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py index d07c1df..7bc31fd 100644 --- a/sdks/python/apache_beam/io/filebasedsource_test.py +++ b/sdks/python/apache_beam/io/filebasedsource_test.py @@ -25,6 +25,8 @@ import os import tempfile import unittest +import hamcrest as hc + import apache_beam as beam from apache_beam.io import filebasedsource from apache_beam.io import fileio @@ -36,6 +38,8 @@ from apache_beam.io.concat_source import ConcatSource from apache_beam.io.filebasedsource import _SingleFileSource as SingleFileSource from apache_beam.io.filebasedsource import FileBasedSource +from apache_beam.transforms.display import DisplayData +from apache_beam.transforms.display_test import DisplayDataItemMatcher from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to @@ -224,6 +228,16 @@ class TestFileBasedSource(unittest.TestCase): read_data = [record for record in fbs.read(range_tracker)] self.assertItemsEqual(expected_data, read_data) + def test_single_file_display_data(self): + file_name, _ = write_data(10) + fbs = LineSource(file_name) + dd = DisplayData.create_from(fbs) + expected_items = [ + DisplayDataItemMatcher('filePattern', file_name), + DisplayDataItemMatcher('compression', 'auto')] + hc.assert_that(dd.items, + hc.contains_inanyorder(*expected_items)) + def test_fully_read_file_pattern(self): pattern, expected_data = write_pattern([5, 3, 12, 8, 8, 4]) assert len(expected_data) == 40 @@ -510,8 +524,8 @@ class TestSingleFileSource(unittest.TestCase): def test_source_creation_fails_for_non_number_offsets(self): start_not_a_number_error = 'start_offset must be a number*' stop_not_a_number_error = 'stop_offset must be a number*' - - fbs = LineSource('dymmy_pattern') + file_name = 'dummy_pattern' + fbs = LineSource(file_name) with self.assertRaisesRegexp(TypeError, start_not_a_number_error): SingleFileSource( @@ -529,11 +543,21 @@ class TestSingleFileSource(unittest.TestCase): SingleFileSource( fbs, file_name='dummy_file', start_offset=None, stop_offset=100) + def test_source_creation_display_data(self): + file_name = 'dummy_pattern' + fbs = LineSource(file_name) + dd = DisplayData.create_from(fbs) + expected_items = [ + DisplayDataItemMatcher('compression', 'auto'), + DisplayDataItemMatcher('filePattern', file_name)] + hc.assert_that(dd.items, + hc.contains_inanyorder(*expected_items)) + def test_source_creation_fails_if_start_lg_stop(self): start_larger_than_stop_error = ( 'start_offset must be smaller than stop_offset*') - fbs = LineSource('dymmy_pattern') + fbs = LineSource('dummy_pattern') SingleFileSource( fbs, file_name='dummy_file', start_offset=99, stop_offset=100) with self.assertRaisesRegexp(ValueError, start_larger_than_stop_error): @@ -544,7 +568,7 @@ class TestSingleFileSource(unittest.TestCase): fbs, file_name='dummy_file', start_offset=100, stop_offset=100) def test_estimates_size(self): - fbs = LineSource('dymmy_pattern') + fbs = LineSource('dummy_pattern') # Should simply return stop_offset - start_offset source = SingleFileSource( @@ -556,7 +580,7 @@ class TestSingleFileSource(unittest.TestCase): self.assertEquals(90, source.estimate_size()) def test_read_range_at_beginning(self): - fbs = LineSource('dymmy_pattern') + fbs = LineSource('dummy_pattern') file_name, expected_data = write_data(10) assert len(expected_data) == 10 @@ -567,7 +591,7 @@ class TestSingleFileSource(unittest.TestCase): self.assertItemsEqual(expected_data[:4], read_data) def test_read_range_at_end(self): - fbs = LineSource('dymmy_pattern') + fbs = LineSource('dummy_pattern') file_name, expected_data = write_data(10) assert len(expected_data) == 10 @@ -578,7 +602,7 @@ class TestSingleFileSource(unittest.TestCase): self.assertItemsEqual(expected_data[-3:], read_data) def test_read_range_at_middle(self): - fbs = LineSource('dymmy_pattern') + fbs = LineSource('dummy_pattern') file_name, expected_data = write_data(10) assert len(expected_data) == 10 @@ -589,7 +613,7 @@ class TestSingleFileSource(unittest.TestCase): self.assertItemsEqual(expected_data[4:7], read_data) def test_produces_splits_desiredsize_large_than_size(self): - fbs = LineSource('dymmy_pattern') + fbs = LineSource('dummy_pattern') file_name, expected_data = write_data(10) assert len(expected_data) == 10 @@ -605,7 +629,7 @@ class TestSingleFileSource(unittest.TestCase): self.assertItemsEqual(expected_data, read_data) def test_produces_splits_desiredsize_smaller_than_size(self): - fbs = LineSource('dymmy_pattern') + fbs = LineSource('dummy_pattern') file_name, expected_data = write_data(10) assert len(expected_data) == 10 @@ -623,7 +647,7 @@ class TestSingleFileSource(unittest.TestCase): self.assertItemsEqual(expected_data, read_data) def test_produce_split_with_start_and_end_positions(self): - fbs = LineSource('dymmy_pattern') + fbs = LineSource('dummy_pattern') file_name, expected_data = write_data(10) assert len(expected_data) == 10 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index f74ac9c..669bfc9 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -34,6 +34,8 @@ from apache_beam import coders from apache_beam.io import iobase from apache_beam.io import range_trackers from apache_beam.runners.dataflow.native_io import iobase as dataflow_io +from apache_beam.transforms.display import DisplayDataItem + __all__ = ['TextFileSource', 'TextFileSink'] @@ -56,6 +58,9 @@ class _CompressionType(object): def __ne__(self, other): return not self.__eq__(other) + def __str__(self): + return self.identifier + def __repr__(self): return '_CompressionType(%s)' % self.identifier @@ -161,6 +166,10 @@ class NativeFileSource(dataflow_io.NativeSource): self.coder = coder self.mime_type = mime_type + def display_data(self): + return {'filePattern': DisplayDataItem(self.file_path, + label="File Pattern")} + def __eq__(self, other): return (self.file_path == other.file_path and self.start_offset == other.start_offset and @@ -1005,6 +1014,17 @@ class NativeFileSink(dataflow_io.NativeSink): self.mime_type = mime_type self.compression_type = compression_type + def display_data(self): + file_name_pattern = '{}{}{}'.format(self.file_name_prefix, + self.shard_name_template, + self.file_name_suffix) + return {'filePattern': + DisplayDataItem(file_name_pattern, + label='File Name Pattern'), + 'compression': + DisplayDataItem(str(self.compression_type), + label='Compression Type')} + @property def path(self): return self.file_path http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/io/fileio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index 15daf04..7e6e60b 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -27,10 +27,14 @@ import tempfile import unittest import zlib +import hamcrest as hc + import apache_beam as beam from apache_beam import coders from apache_beam.io import fileio from apache_beam.runners.dataflow.native_io import iobase as dataflow_io +from apache_beam.transforms.display import DisplayData +from apache_beam.transforms.display_test import DisplayDataItemMatcher # TODO: Add tests for file patterns (ie not just individual files) for both # uncompressed @@ -52,8 +56,9 @@ class TestTextFileSource(unittest.TestCase): output_lines, start_offset=None, end_offset=None): + file_name = self.create_temp_file('\n'.join(input_lines)) source = fileio.TextFileSource( - file_path=self.create_temp_file('\n'.join(input_lines)), + file_path=file_name, start_offset=start_offset, end_offset=end_offset) read_lines = [] @@ -61,6 +66,11 @@ class TestTextFileSource(unittest.TestCase): for line in reader: read_lines.append(line) self.assertEqual(read_lines, output_lines) + dd = DisplayData.create_from(source) + expected_items = [ + DisplayDataItemMatcher('filePattern', file_name)] + hc.assert_that(dd.items, + hc.contains_inanyorder(*expected_items)) def progress_with_offsets(self, input_lines, @@ -592,6 +602,30 @@ class TestNativeTextFileSink(unittest.TestCase): with open(self.path, 'r') as f: self.assertEqual(f.read().splitlines(), self.lines) + def test_text_file_display_data(self): + sink = fileio.NativeTextFileSink(self.path) + dd = DisplayData.create_from(sink) + expected_items = [ + DisplayDataItemMatcher( + 'filePattern', + '{}{}'.format(self.path, '-SSSSS-of-NNNNN')), + DisplayDataItemMatcher( + 'compression', + 'auto')] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + + def test_text_file_display_data_suffix(self): + sink = fileio.NativeTextFileSink(self.path, file_name_suffix='.pdf') + dd = DisplayData.create_from(sink) + expected_items = [ + DisplayDataItemMatcher( + 'filePattern', + '{}{}{}'.format(self.path, '-SSSSS-of-NNNNN', '.pdf')), + DisplayDataItemMatcher( + 'compression', + 'auto')] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + def test_write_text_file_empty(self): sink = fileio.NativeTextFileSink(self.path) self._write_lines(sink, []) @@ -607,6 +641,19 @@ class TestNativeTextFileSink(unittest.TestCase): with gzip.GzipFile(self.path, 'r') as f: self.assertEqual(f.read().splitlines(), self.lines) + def test_display_data_gzip_file(self): + sink = fileio.NativeTextFileSink( + self.path, compression_type=fileio.CompressionTypes.GZIP) + dd = DisplayData.create_from(sink) + expected_items = [ + DisplayDataItemMatcher( + 'filePattern', + '{}{}'.format(self.path, '-SSSSS-of-NNNNN')), + DisplayDataItemMatcher( + 'compression', + 'gzip')] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + def test_write_text_gzip_file_auto(self): self.path = tempfile.NamedTemporaryFile(suffix='.gz').name sink = fileio.NativeTextFileSink(self.path) @@ -631,6 +678,19 @@ class TestNativeTextFileSink(unittest.TestCase): with bz2.BZ2File(self.path, 'r') as f: self.assertEqual(f.read().splitlines(), self.lines) + def test_display_data_bzip2_file(self): + sink = fileio.NativeTextFileSink( + self.path, compression_type=fileio.CompressionTypes.BZIP2) + dd = DisplayData.create_from(sink) + expected_items = [ + DisplayDataItemMatcher( + 'filePattern', + '{}{}'.format(self.path, '-SSSSS-of-NNNNN')), + DisplayDataItemMatcher( + 'compression', + 'bzip2')] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + def test_write_text_bzip2_file_auto(self): self.path = tempfile.NamedTemporaryFile(suffix='.bz2').name sink = fileio.NativeTextFileSink(self.path) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index fd46dd6..a0de131 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -42,6 +42,7 @@ from apache_beam.pvalue import AsSingleton from apache_beam.transforms import core from apache_beam.transforms import ptransform from apache_beam.transforms import window +from apache_beam.transforms.display import HasDisplayData, DisplayDataItem # Encapsulates information about a bundle of a source generated when method @@ -65,7 +66,7 @@ SourceBundle = namedtuple( 'weight source start_position stop_position') -class BoundedSource(object): +class BoundedSource(HasDisplayData): """A source that reads a finite amount of input records. This class defines following operations which can be used to read the source @@ -670,6 +671,11 @@ class Read(ptransform.PTransform): else: return self.source.coder + def display_data(self): + return {'source': DisplayDataItem(self.source.__class__, + label='Read Source'), + 'source_dd': self.source} + class Write(ptransform.PTransform): """A ``PTransform`` that writes to a sink. @@ -712,6 +718,10 @@ class Write(ptransform.PTransform): super(Write, self).__init__(label) self.sink = sink + def display_data(self): + return {'sink': self.sink.__class__, + 'sink_dd': self.sink} + def apply(self, pcoll): from apache_beam.runners.dataflow.native_io import iobase as dataflow_io if isinstance(self.sink, dataflow_io.NativeSink): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/io/pubsub.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/pubsub.py b/sdks/python/apache_beam/io/pubsub.py index da81742..40bd368 100644 --- a/sdks/python/apache_beam/io/pubsub.py +++ b/sdks/python/apache_beam/io/pubsub.py @@ -24,6 +24,7 @@ from __future__ import absolute_import from apache_beam import coders from apache_beam.runners.dataflow.native_io import iobase as dataflow_io +from apache_beam.transforms.display import DisplayDataItem class PubSubSource(dataflow_io.NativeSource): @@ -54,6 +55,17 @@ class PubSubSource(dataflow_io.NativeSource): """Source format name required for remote execution.""" return 'pubsub' + def display_data(self): + return {'idLabel': + DisplayDataItem(self.id_label, + label='ID Label Attribute').drop_if_none(), + 'topic': + DisplayDataItem(self.topic, + label='Pubsub Topic'), + 'subscription': + DisplayDataItem(self.subscription, + label='Pubsub Subscription').drop_if_none()} + def reader(self): raise NotImplementedError( 'PubSubSource is not supported in local execution.') @@ -71,6 +83,9 @@ class PubSubSink(dataflow_io.NativeSink): """Sink format name required for remote execution.""" return 'pubsub' + def display_data(self): + return {'topic': DisplayDataItem(self.topic, label='Pubsub Topic')} + def writer(self): raise NotImplementedError( 'PubSubSink is not supported in local execution.') http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/io/pubsub_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/pubsub_test.py b/sdks/python/apache_beam/io/pubsub_test.py new file mode 100644 index 0000000..828d233 --- /dev/null +++ b/sdks/python/apache_beam/io/pubsub_test.py @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for PubSub sources and sinks.""" + +import logging +import unittest + +import hamcrest as hc + +from apache_beam.io.pubsub import PubSubSource, PubSubSink +from apache_beam.transforms.display import DisplayData +from apache_beam.transforms.display_test import DisplayDataItemMatcher + + +class TestPubSubSource(unittest.TestCase): + + def test_display_data(self): + source = PubSubSource('a_topic', 'a_subscription', 'a_label') + dd = DisplayData.create_from(source) + expected_items = [ + DisplayDataItemMatcher('topic', 'a_topic'), + DisplayDataItemMatcher('subscription', 'a_subscription'), + DisplayDataItemMatcher('idLabel', 'a_label')] + + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + + def test_display_data_no_subscription(self): + source = PubSubSource('a_topic') + dd = DisplayData.create_from(source) + expected_items = [ + DisplayDataItemMatcher('topic', 'a_topic')] + + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + + +class TestPubSubSink(unittest.TestCase): + def test_display_data(self): + sink = PubSubSink('a_topic') + dd = DisplayData.create_from(sink) + expected_items = [ + DisplayDataItemMatcher('topic', 'a_topic')] + + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py index 9621f4c..32da3a2 100644 --- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py +++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py @@ -22,6 +22,7 @@ import logging from apache_beam import pvalue from apache_beam.transforms import ptransform +from apache_beam.transforms.display import HasDisplayData def _dict_printable_fields(dict_object, skip_fields): @@ -38,7 +39,7 @@ _minor_fields = ['coder', 'key_coder', 'value_coder', 'compression_type'] -class NativeSource(object): +class NativeSource(HasDisplayData): """A source implemented by Dataflow service. This class is to be only inherited by sources natively implemented by Cloud @@ -244,7 +245,7 @@ class DynamicSplitResultWithPosition(DynamicSplitResult): self.stop_position = stop_position -class NativeSink(object): +class NativeSink(HasDisplayData): """A sink implemented by Dataflow service. This class is to be only inherited by sinks natively implemented by Cloud http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 3b5816a..3189de7 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -29,7 +29,7 @@ from apache_beam.coders import typecoders from apache_beam.internal import util from apache_beam.transforms import ptransform from apache_beam.transforms import window -from apache_beam.transforms.display import HasDisplayData +from apache_beam.transforms.display import HasDisplayData, DisplayDataItem from apache_beam.transforms.ptransform import PTransform from apache_beam.transforms.ptransform import PTransformWithSideInputs from apache_beam.transforms.window import MIN_TIMESTAMP @@ -235,6 +235,16 @@ class CallableWrapperDoFn(DoFn): super(CallableWrapperDoFn, self).__init__() + def display_data(self): + # If the callable has a name, then it's likely a function, and + # we show its name. + # Otherwise, it might be an instance of a callable class. We + # show its class. + display_data_value = (self._fn.__name__ if hasattr(self._fn, '__name__') + else self._fn.__class__) + return {'fn': DisplayDataItem(display_data_value, + label='Transform Function')} + def __repr__(self): return 'CallableWrapperDoFn(%s)' % self._fn @@ -580,6 +590,11 @@ class ParDo(PTransformWithSideInputs): def process_argspec_fn(self): return self.fn.process_argspec_fn() + def display_data(self): + return {'fn': DisplayDataItem(self.fn.__class__, + label='Transform Function'), + 'fn_dd': self.fn} + def apply(self, pcoll): self.side_output_tags = set() # TODO(robertwb): Change all uses of the dofn attribute to use fn instead. @@ -696,6 +711,10 @@ def Map(fn_or_label, *args, **kwargs): # pylint: disable=invalid-name else: wrapper = lambda x: [fn(x)] + # TODO. What about callable classes? + if hasattr(fn, '__name__'): + wrapper.__name__ = fn.__name__ + # Proxy the type-hint information from the original function to this new # wrapped function. get_type_hints(wrapper).input_types = get_type_hints(fn).input_types @@ -739,6 +758,9 @@ def Filter(fn_or_label, *args, **kwargs): # pylint: disable=invalid-name % (fn, 'first' if label is None else 'second')) wrapper = lambda x, *args, **kwargs: [x] if fn(x, *args, **kwargs) else [] + # TODO: What about callable classes? + if hasattr(fn, '__name__'): + wrapper.__name__ = fn.__name__ # Proxy the type-hint information from the function being wrapped, setting the # output type to be the same as the input type. get_type_hints(wrapper).input_types = get_type_hints(fn).input_types http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/transforms/display.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py index e93d560..365abaf 100644 --- a/sdks/python/apache_beam/transforms/display.py +++ b/sdks/python/apache_beam/transforms/display.py @@ -93,6 +93,8 @@ class DisplayData(object): continue if isinstance(element, DisplayDataItem): + if element.should_drop(): + continue element.key = key element.namespace = self.namespace self.items.append(element) @@ -132,6 +134,7 @@ class DisplayDataItem(object): typeDict = {str:'STRING', int:'INTEGER', float:'FLOAT', + bool: 'BOOLEAN', timedelta:'DURATION', datetime:'TIMESTAMP'} @@ -145,6 +148,40 @@ class DisplayDataItem(object): self.value = value self.url = url self.label = label + self._drop_if_none = False + self._drop_if_default = False + + def drop_if_none(self): + """ The item should be dropped if its value is None. + + Returns: + Returns self. + """ + self._drop_if_none = True + return self + + def drop_if_default(self, default): + """ The item should be dropped if its value is equal to its default. + + Returns: + Returns self. + """ + self._default = default + self._drop_if_default = True + return self + + def should_drop(self): + """ Return True if the item should be dropped, or False if it should not + be dropped. This depends on the drop_if_none, and drop_if_default calls. + + Returns: + True or False; depending on whether the item should be dropped or kept. + """ + if self._drop_if_none and self.value is None: + return True + if self._drop_if_default and self.value == self._default: + return True + return False def is_valid(self): """ Checks that all the necessary fields of the DisplayDataItem are @@ -164,24 +201,12 @@ class DisplayDataItem(object): 'Invalid DisplayDataItem. Value {} is of an unsupported type.' .format(self.value)) - def get_dict(self): - """ Returns the internal-API dictionary representing the DisplayDataItem. - - Returns: - A dictionary. The internal-API dictionary representing the - DisplayDataItem - - Raises: - ValueError: if the item is not valid. - """ - self.is_valid() - + def _get_dict(self): res = {'key': self.key, 'namespace': self.namespace, 'type': self.type if self.type != 'CLASS' else 'STRING'} # TODO: Python Class types should not be special-cased once # the Fn API is in. - if self.url is not None: res['url'] = self.url if self.shortValue is not None: @@ -191,19 +216,32 @@ class DisplayDataItem(object): res['value'] = self._format_value(self.value, self.type) return res + def get_dict(self): + """ Returns the internal-API dictionary representing the DisplayDataItem. + + Returns: + A dictionary. The internal-API dictionary representing the + DisplayDataItem + + Raises: + ValueError: if the item is not valid. + """ + self.is_valid() + return self._get_dict() + def __repr__(self): - return 'DisplayDataItem({})'.format(json.dumps(self.get_dict())) + return 'DisplayDataItem({})'.format(json.dumps(self._get_dict())) def __eq__(self, other): if isinstance(other, self.__class__): - return self.get_dict() == other.get_dict() + return self._get_dict() == other._get_dict() return False def __ne__(self, other): return not self == other def __hash__(self): - return hash(tuple(sorted(self.get_dict().items()))) + return hash(tuple(sorted(self._get_dict().items()))) @classmethod def _format_value(cls, value, type_): @@ -259,4 +297,6 @@ class DisplayDataItem(object): type_ = cls.typeDict.get(type(value)) if type_ is None: type_ = 'CLASS' if inspect.isclass(value) else None + if type_ is None and value is None: + type_ = 'STRING' return type_ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/transforms/display_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py index 87d0920..d514065 100644 --- a/sdks/python/apache_beam/transforms/display_test.py +++ b/sdks/python/apache_beam/transforms/display_test.py @@ -22,14 +22,72 @@ from __future__ import absolute_import from datetime import datetime import unittest +import hamcrest as hc +from hamcrest.core.base_matcher import BaseMatcher + import apache_beam as beam from apache_beam.transforms.display import HasDisplayData from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display import DisplayDataItem +class DisplayDataItemMatcher(BaseMatcher): + """ Matcher class for DisplayDataItems in unit tests. + """ + IGNORED = object() + + def __init__(self, key=IGNORED, value=IGNORED, + namespace=IGNORED, label=IGNORED, shortValue=IGNORED): + if all(member == DisplayDataItemMatcher.IGNORED for member in + [key, value, namespace, label, shortValue]): + raise ValueError('Must receive at least one item attribute to match') + + self.key = key + self.value = value + self.namespace = namespace + self.label = label + self.shortValue = shortValue + + def _matches(self, item): + if self.key != DisplayDataItemMatcher.IGNORED and item.key != self.key: + return False + if (self.namespace != DisplayDataItemMatcher.IGNORED and + item.namespace != self.namespace): + return False + if (self.value != DisplayDataItemMatcher.IGNORED and + item.value != self.value): + return False + if (self.label != DisplayDataItemMatcher.IGNORED and + item.label != self.label): + return False + if (self.shortValue != DisplayDataItemMatcher.IGNORED and + item.shortValue != self.shortValue): + return False + return True + + def describe_to(self, description): + descriptors = [] + if self.key != DisplayDataItemMatcher.IGNORED: + descriptors.append('key is {}'.format(self.key)) + if self.value != DisplayDataItemMatcher.IGNORED: + descriptors.append('value is {}'.format(self.value)) + if self.namespace != DisplayDataItemMatcher.IGNORED: + descriptors.append('namespace is {}'.format(self.namespace)) + if self.label != DisplayDataItemMatcher.IGNORED: + descriptors.append('label is {}'.format(self.label)) + if self.shortValue != DisplayDataItemMatcher.IGNORED: + descriptors.append('shortValue is {}'.format(self.shortValue)) + + item_description = '{}'.format(' and '.join(descriptors)) + description.append(item_description) + + class DisplayDataTest(unittest.TestCase): + def test_display_data_item_matcher(self): + with self.assertRaises(ValueError): + DisplayDataItemMatcher() + def test_inheritance_ptransform(self): class MyTransform(beam.PTransform): pass @@ -70,52 +128,58 @@ class DisplayDataTest(unittest.TestCase): now = datetime.now() fn = MyDoFn(my_display_data=now) dd = DisplayData.create_from(fn) - nspace = '{}.{}'.format(fn.__module__, fn.__class__.__name__) - expected_items = set([ - DisplayDataItem(namespace=nspace, - key='complex_url', - value='github.com', - label='The URL', - url='http://github.com'), - DisplayDataItem(namespace=nspace, - key='my_dd', - value=now), - DisplayDataItem(namespace=nspace, - key='python_class', - shortValue='HasDisplayData', - value='apache_beam.transforms.display.HasDisplayData'), - DisplayDataItem(namespace=nspace, - key='static_integer', - value=120), - DisplayDataItem(namespace=nspace, - key='static_string', - value='static me!'), - ]) - - self.assertEqual(set(dd.items), expected_items) - - def test_subcomponent(self): - class SpecialParDo(beam.PTransform): - def __init__(self, fn): - self.fn = fn - + expected_items = [ + DisplayDataItemMatcher(key='complex_url', + value='github.com', + namespace=nspace, + label='The URL'), + DisplayDataItemMatcher(key='my_dd', + value=now, + namespace=nspace), + DisplayDataItemMatcher(key='python_class', + value=HasDisplayData, + namespace=nspace, + shortValue='HasDisplayData'), + DisplayDataItemMatcher(key='static_integer', + value=120, + namespace=nspace), + DisplayDataItemMatcher(key='static_string', + value='static me!', + namespace=nspace)] + + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + + def test_drop_if_none(self): + class MyDoFn(beam.DoFn): def display_data(self): - return {'asubcomponent': self.fn} + return {'some_val': DisplayDataItem('something').drop_if_none(), + 'non_val': DisplayDataItem(None).drop_if_none(), + 'def_val': DisplayDataItem(True).drop_if_default(True), + 'nodef_val': DisplayDataItem(True).drop_if_default(False)} + + dd = DisplayData.create_from(MyDoFn()) + expected_items = [DisplayDataItemMatcher('some_val', + 'something'), + DisplayDataItemMatcher('nodef_val', + True)] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + def test_subcomponent(self): class SpecialDoFn(beam.DoFn): def display_data(self): return {'dofn_value': 42} dofn = SpecialDoFn() - pardo = SpecialParDo(dofn) + pardo = beam.ParDo(dofn) dd = DisplayData.create_from(pardo) - nspace = '{}.{}'.format(dofn.__module__, dofn.__class__.__name__) - self.assertEqual(dd.items[0].get_dict(), - {"type": "INTEGER", - "namespace": nspace, - "value": 42, - "key": "dofn_value"}) + dofn_nspace = '{}.{}'.format(dofn.__module__, dofn.__class__.__name__) + pardo_nspace = '{}.{}'.format(pardo.__module__, pardo.__class__.__name__) + expected_items = [ + DisplayDataItemMatcher('dofn_value', 42, dofn_nspace), + DisplayDataItemMatcher('fn', SpecialDoFn, pardo_nspace)] + + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) # TODO: Test __repr__ function http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index b99cd26..e3b1026 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -23,11 +23,13 @@ import operator import re import unittest +import hamcrest as hc import apache_beam as beam from apache_beam.pipeline import Pipeline import apache_beam.pvalue as pvalue import apache_beam.transforms.combiners as combine +from apache_beam.transforms.display import DisplayData, DisplayDataItem from apache_beam.transforms.ptransform import PTransform from apache_beam.transforms.util import assert_that, equal_to import apache_beam.typehints as typehints @@ -663,6 +665,62 @@ class PTransformLabelsTest(unittest.TestCase): self.check_label(beam.ParDo(MyDoFn()), r'ParDo(MyDoFn)') +class PTransformTestDisplayData(unittest.TestCase): + def test_map_named_function(self): + tr = beam.Map(len) + dd = DisplayData.create_from(tr) + nspace = 'apache_beam.transforms.core.CallableWrapperDoFn' + expected_item = DisplayDataItem('len', key='fn', + label='Transform Function', + namespace=nspace) + hc.assert_that(dd.items, hc.has_item(expected_item)) + + def test_map_anonymous_function(self): + tr = beam.Map(lambda x: x) + dd = DisplayData.create_from(tr) + nspace = 'apache_beam.transforms.core.CallableWrapperDoFn' + expected_item = DisplayDataItem('<lambda>', key='fn', + label='Transform Function', + namespace=nspace) + hc.assert_that(dd.items, hc.has_item(expected_item)) + + def test_flatmap_named_function(self): + tr = beam.FlatMap(list) + dd = DisplayData.create_from(tr) + nspace = 'apache_beam.transforms.core.CallableWrapperDoFn' + expected_item = DisplayDataItem('list', key='fn', + label='Transform Function', + namespace=nspace) + hc.assert_that(dd.items, hc.has_item(expected_item)) + + def test_flatmap_anonymous_function(self): + tr = beam.FlatMap(lambda x: [x]) + dd = DisplayData.create_from(tr) + nspace = 'apache_beam.transforms.core.CallableWrapperDoFn' + expected_item = DisplayDataItem('<lambda>', key='fn', + label='Transform Function', + namespace=nspace) + hc.assert_that(dd.items, hc.has_item(expected_item)) + + def test_filter_named_function(self): + tr = beam.Filter(sum) + dd = DisplayData.create_from(tr) + nspace = 'apache_beam.transforms.core.CallableWrapperDoFn' + expected_item = DisplayDataItem('sum', key='fn', + label='Transform Function', + namespace=nspace) + hc.assert_that(dd.items, hc.has_item(expected_item)) + + def test_filter_anonymous_function(self): + tr = beam.Filter(lambda x: x // 30) + dd = DisplayData.create_from(tr) + nspace = 'apache_beam.transforms.core.CallableWrapperDoFn' + expected_item = DisplayDataItem('<lambda>', key='fn', + label='Transform Function', + namespace=nspace) + hc.assert_that(dd.items, hc.has_item(expected_item)) + + class PTransformTypeCheckTestCase(TypeHintTestCase): def assertStartswith(self, msg, prefix):