Repository: beam Updated Branches: refs/heads/python-sdk c6420df97 -> e3849af8c
Fix read/write display data Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e4eda3c3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e4eda3c3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e4eda3c3 Branch: refs/heads/python-sdk Commit: e4eda3c335b5767bdaf40b56b2dd5d67d7348f20 Parents: c6420df Author: Pablo <pabl...@google.com> Authored: Fri Jan 13 11:25:36 2017 -0800 Committer: Robert Bradshaw <rober...@google.com> Committed: Thu Jan 26 14:51:56 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/io/avroio_test.py | 6 ---- sdks/python/apache_beam/io/fileio.py | 10 ++++++- sdks/python/apache_beam/io/fileio_test.py | 2 -- sdks/python/apache_beam/io/iobase.py | 38 +++++++++++++++++--------- sdks/python/apache_beam/io/textio.py | 25 +++++++++-------- sdks/python/apache_beam/io/textio_test.py | 30 -------------------- 6 files changed, 47 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/avroio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index aed468d..d2fb1d1 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -196,9 +196,6 @@ class TestAvro(unittest.TestCase): 'file_pattern', 'some_avro_sink-%(shard_num)05d-of-%(num_shards)05d.end'), DisplayDataItemMatcher( - 'shards', - 0), - DisplayDataItemMatcher( 'codec', 'null'), DisplayDataItemMatcher( @@ -219,9 +216,6 @@ class TestAvro(unittest.TestCase): 'file_pattern', 'some_avro_sink-%(shard_num)05d-of-%(num_shards)05d'), DisplayDataItemMatcher( - 'shards', - 0), - DisplayDataItemMatcher( 'codec', 'deflate'), DisplayDataItemMatcher( http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 52f31c6..f67dca9 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -547,7 +547,8 @@ class FileSink(iobase.Sink): def display_data(self): return {'shards': - DisplayDataItem(self.num_shards, label='Number of Shards'), + DisplayDataItem(self.num_shards, + label='Number of Shards').drop_if_default(0), 'compression': DisplayDataItem(str(self.compression_type)), 'file_pattern': @@ -787,6 +788,13 @@ class TextFileSink(FileSink): '\'textio.WriteToText()\' instead of directly ' 'instantiating a TextFileSink object.') + def display_data(self): + dd_parent = super(TextFileSink, self).display_data() + dd_parent['append_newline'] = DisplayDataItem( + self.append_trailing_newlines, + label='Append Trailing New Lines') + return dd_parent + def write_encoded_record(self, file_handle, encoded_value): """Writes a single encoded record.""" file_handle.write(encoded_value) http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/fileio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index ad77dc5..6c33f53 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -142,8 +142,6 @@ class TestFileSink(unittest.TestCase): dd = DisplayData.create_from(sink) expected_items = [ DisplayDataItemMatcher( - 'shards', 0), - DisplayDataItemMatcher( 'compression', 'auto'), DisplayDataItemMatcher( 'file_pattern', http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 12af3b6..1266ed3 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -759,16 +759,15 @@ class WriteImpl(ptransform.PTransform): write_result_coll = (keyed_pcoll | core.WindowInto(window.GlobalWindows()) | core.GroupByKey() - | 'WriteBundles' >> core.Map( - _write_keyed_bundle, self.sink, + | 'WriteBundles' >> core.ParDo( + _WriteKeyedBundleDoFn(self.sink), AsSingleton(init_result_coll))) else: min_shards = 1 write_result_coll = (pcoll | 'WriteBundles' >> - core.ParDo( - _WriteBundleDoFn(), self.sink, - AsSingleton(init_result_coll)) + core.ParDo(_WriteBundleDoFn(self.sink), + AsSingleton(init_result_coll)) | 'Pair' >> core.Map(lambda x: (None, x)) | core.WindowInto(window.GlobalWindows()) | core.GroupByKey() @@ -788,12 +787,16 @@ class _WriteBundleDoFn(core.DoFn): Opens a writer at the first element and closes the writer at finish_bundle(). """ - def __init__(self): + def __init__(self, sink): self.writer = None + self.sink = sink - def process(self, context, sink, init_result): + def display_data(self): + return {'sink_dd': self.sink} + + def process(self, context, init_result): if self.writer is None: - self.writer = sink.open_writer(init_result, str(uuid.uuid4())) + self.writer = self.sink.open_writer(init_result, str(uuid.uuid4())) self.writer.write(context.element) def finish_bundle(self, context, *args, **kwargs): @@ -801,11 +804,20 @@ class _WriteBundleDoFn(core.DoFn): yield window.TimestampedValue(self.writer.close(), window.MAX_TIMESTAMP) -def _write_keyed_bundle(bundle, sink, init_result): - writer = sink.open_writer(init_result, str(uuid.uuid4())) - for element in bundle[1]: # values - writer.write(element) - return window.TimestampedValue(writer.close(), window.MAX_TIMESTAMP) +class _WriteKeyedBundleDoFn(core.DoFn): + + def __init__(self, sink): + self.sink = sink + + def display_data(self): + return {'sink_dd': self.sink} + + def process(self, context, init_result): + bundle = context.element + writer = self.sink.open_writer(init_result, str(uuid.uuid4())) + for element in bundle[1]: # values + writer.write(element) + return [window.TimestampedValue(writer.close(), window.MAX_TIMESTAMP)] def _finalize_write(_, sink, init_result, write_results, min_shards): http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/textio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py index 4cdab12..0a593df 100644 --- a/sdks/python/apache_beam/io/textio.py +++ b/sdks/python/apache_beam/io/textio.py @@ -83,6 +83,19 @@ class _TextSource(filebasedsource.FileBasedSource): self._coder = coder self._buffer_size = buffer_size + def display_data(self): + parent_dd = super(_TextSource, self).display_data() + parent_dd['strip_newline'] = DisplayDataItem( + self._strip_trailing_newlines, + label='Strip Trailing New Lines') + parent_dd['buffer_size'] = DisplayDataItem( + self._buffer_size, + label='Buffer Size') + parent_dd['coder'] = DisplayDataItem( + self._coder.__class__, + label='Coder') + return parent_dd + def read_records(self, file_name, range_tracker): start_offset = range_tracker.start_position() read_buffer = _TextSource.ReadBuffer('', 0) @@ -252,11 +265,6 @@ class ReadFromText(PTransform): def expand(self, pvalue): return pvalue.pipeline | Read(self._source) - def display_data(self): - return {'source_dd': self._source, - 'strip_newline': DisplayDataItem(self._strip_trailing_newlines, - label='Strip Trailing New Lines')} - class WriteToText(PTransform): """A PTransform for writing to text files.""" @@ -302,16 +310,9 @@ class WriteToText(PTransform): compression. """ - self._append_trailing_newlines = append_trailing_newlines self._sink = _TextSink(file_path_prefix, file_name_suffix, append_trailing_newlines, num_shards, shard_name_template, coder, compression_type) def expand(self, pcoll): return pcoll | Write(self._sink) - - def display_data(self): - return {'sink_dd': self._sink, - 'append_newline': DisplayDataItem( - self._append_trailing_newlines, - label='Append Trailing New Lines')} http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/textio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index 4b85584..07c6d9c 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -25,8 +25,6 @@ import os import tempfile import unittest -import hamcrest as hc - import apache_beam as beam import apache_beam.io.source_test_utils as source_test_utils @@ -45,9 +43,6 @@ from apache_beam.io.fileio import CompressionTypes from apache_beam.test_pipeline import TestPipeline -from apache_beam.transforms.display import DisplayData -from apache_beam.transforms.display_test import DisplayDataItemMatcher - from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to @@ -294,15 +289,6 @@ class TextSourceTest(unittest.TestCase): splits[0].source, splits[0].start_position, splits[0].stop_position, perform_multi_threaded_test=False) - def test_read_display_data(self): - read = ReadFromText('prefix', validate=False) - dd = DisplayData.create_from(read) - expected_items = [ - DisplayDataItemMatcher('compression', 'auto'), - DisplayDataItemMatcher('file_pattern', 'prefix'), - DisplayDataItemMatcher('strip_newline', True)] - hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) - def test_dataflow_single_file(self): file_name, expected_data = write_data(5) assert len(expected_data) == 5 @@ -506,22 +492,6 @@ class TextSinkTest(unittest.TestCase): with gzip.GzipFile(self.path, 'r') as f: self.assertEqual(f.read().splitlines(), []) - def test_write_display_data(self): - write = WriteToText('prefix') - dd = DisplayData.create_from(write) - expected_items = [ - DisplayDataItemMatcher( - 'append_newline', True), - DisplayDataItemMatcher( - 'compression', 'auto'), - DisplayDataItemMatcher( - 'shards', 0), - DisplayDataItemMatcher( - 'file_pattern', - '{}{}'.format('prefix', - '-%(shard_num)05d-of-%(num_shards)05d'))] - hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) - def test_write_dataflow(self): pipeline = TestPipeline() pcoll = pipeline | beam.core.Create(self.lines)