Repository: beam
Updated Branches:
  refs/heads/python-sdk c6420df97 -> e3849af8c


Fix read/write display data


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e4eda3c3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e4eda3c3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e4eda3c3

Branch: refs/heads/python-sdk
Commit: e4eda3c335b5767bdaf40b56b2dd5d67d7348f20
Parents: c6420df
Author: Pablo <pabl...@google.com>
Authored: Fri Jan 13 11:25:36 2017 -0800
Committer: Robert Bradshaw <rober...@google.com>
Committed: Thu Jan 26 14:51:56 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/avroio_test.py |  6 ----
 sdks/python/apache_beam/io/fileio.py      | 10 ++++++-
 sdks/python/apache_beam/io/fileio_test.py |  2 --
 sdks/python/apache_beam/io/iobase.py      | 38 +++++++++++++++++---------
 sdks/python/apache_beam/io/textio.py      | 25 +++++++++--------
 sdks/python/apache_beam/io/textio_test.py | 30 --------------------
 6 files changed, 47 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py 
b/sdks/python/apache_beam/io/avroio_test.py
index aed468d..d2fb1d1 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -196,9 +196,6 @@ class TestAvro(unittest.TestCase):
             'file_pattern',
             'some_avro_sink-%(shard_num)05d-of-%(num_shards)05d.end'),
         DisplayDataItemMatcher(
-            'shards',
-            0),
-        DisplayDataItemMatcher(
             'codec',
             'null'),
         DisplayDataItemMatcher(
@@ -219,9 +216,6 @@ class TestAvro(unittest.TestCase):
             'file_pattern',
             'some_avro_sink-%(shard_num)05d-of-%(num_shards)05d'),
         DisplayDataItemMatcher(
-            'shards',
-            0),
-        DisplayDataItemMatcher(
             'codec',
             'deflate'),
         DisplayDataItemMatcher(

http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py 
b/sdks/python/apache_beam/io/fileio.py
index 52f31c6..f67dca9 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -547,7 +547,8 @@ class FileSink(iobase.Sink):
 
   def display_data(self):
     return {'shards':
-            DisplayDataItem(self.num_shards, label='Number of Shards'),
+            DisplayDataItem(self.num_shards,
+                            label='Number of Shards').drop_if_default(0),
             'compression':
             DisplayDataItem(str(self.compression_type)),
             'file_pattern':
@@ -787,6 +788,13 @@ class TextFileSink(FileSink):
                       '\'textio.WriteToText()\' instead of directly '
                       'instantiating a TextFileSink object.')
 
+  def display_data(self):
+    dd_parent = super(TextFileSink, self).display_data()
+    dd_parent['append_newline'] = DisplayDataItem(
+        self.append_trailing_newlines,
+        label='Append Trailing New Lines')
+    return dd_parent
+
   def write_encoded_record(self, file_handle, encoded_value):
     """Writes a single encoded record."""
     file_handle.write(encoded_value)

http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py 
b/sdks/python/apache_beam/io/fileio_test.py
index ad77dc5..6c33f53 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -142,8 +142,6 @@ class TestFileSink(unittest.TestCase):
     dd = DisplayData.create_from(sink)
     expected_items = [
         DisplayDataItemMatcher(
-            'shards', 0),
-        DisplayDataItemMatcher(
             'compression', 'auto'),
         DisplayDataItemMatcher(
             'file_pattern',

http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py 
b/sdks/python/apache_beam/io/iobase.py
index 12af3b6..1266ed3 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -759,16 +759,15 @@ class WriteImpl(ptransform.PTransform):
       write_result_coll = (keyed_pcoll
                            | core.WindowInto(window.GlobalWindows())
                            | core.GroupByKey()
-                           | 'WriteBundles' >> core.Map(
-                               _write_keyed_bundle, self.sink,
+                           | 'WriteBundles' >> core.ParDo(
+                               _WriteKeyedBundleDoFn(self.sink),
                                AsSingleton(init_result_coll)))
     else:
       min_shards = 1
       write_result_coll = (pcoll
                            | 'WriteBundles' >>
-                           core.ParDo(
-                               _WriteBundleDoFn(), self.sink,
-                               AsSingleton(init_result_coll))
+                           core.ParDo(_WriteBundleDoFn(self.sink),
+                                      AsSingleton(init_result_coll))
                            | 'Pair' >> core.Map(lambda x: (None, x))
                            | core.WindowInto(window.GlobalWindows())
                            | core.GroupByKey()
@@ -788,12 +787,16 @@ class _WriteBundleDoFn(core.DoFn):
   Opens a writer at the first element and closes the writer at finish_bundle().
   """
 
-  def __init__(self):
+  def __init__(self, sink):
     self.writer = None
+    self.sink = sink
 
-  def process(self, context, sink, init_result):
+  def display_data(self):
+    return {'sink_dd': self.sink}
+
+  def process(self, context, init_result):
     if self.writer is None:
-      self.writer = sink.open_writer(init_result, str(uuid.uuid4()))
+      self.writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
     self.writer.write(context.element)
 
   def finish_bundle(self, context, *args, **kwargs):
@@ -801,11 +804,20 @@ class _WriteBundleDoFn(core.DoFn):
       yield window.TimestampedValue(self.writer.close(), window.MAX_TIMESTAMP)
 
 
-def _write_keyed_bundle(bundle, sink, init_result):
-  writer = sink.open_writer(init_result, str(uuid.uuid4()))
-  for element in bundle[1]:  # values
-    writer.write(element)
-  return window.TimestampedValue(writer.close(), window.MAX_TIMESTAMP)
+class _WriteKeyedBundleDoFn(core.DoFn):
+
+  def __init__(self, sink):
+    self.sink = sink
+
+  def display_data(self):
+    return {'sink_dd': self.sink}
+
+  def process(self, context, init_result):
+    bundle = context.element
+    writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
+    for element in bundle[1]:  # values
+      writer.write(element)
+    return [window.TimestampedValue(writer.close(), window.MAX_TIMESTAMP)]
 
 
 def _finalize_write(_, sink, init_result, write_results, min_shards):

http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py 
b/sdks/python/apache_beam/io/textio.py
index 4cdab12..0a593df 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -83,6 +83,19 @@ class _TextSource(filebasedsource.FileBasedSource):
     self._coder = coder
     self._buffer_size = buffer_size
 
+  def display_data(self):
+    parent_dd = super(_TextSource, self).display_data()
+    parent_dd['strip_newline'] = DisplayDataItem(
+        self._strip_trailing_newlines,
+        label='Strip Trailing New Lines')
+    parent_dd['buffer_size'] = DisplayDataItem(
+        self._buffer_size,
+        label='Buffer Size')
+    parent_dd['coder'] = DisplayDataItem(
+        self._coder.__class__,
+        label='Coder')
+    return parent_dd
+
   def read_records(self, file_name, range_tracker):
     start_offset = range_tracker.start_position()
     read_buffer = _TextSource.ReadBuffer('', 0)
@@ -252,11 +265,6 @@ class ReadFromText(PTransform):
   def expand(self, pvalue):
     return pvalue.pipeline | Read(self._source)
 
-  def display_data(self):
-    return {'source_dd': self._source,
-            'strip_newline': DisplayDataItem(self._strip_trailing_newlines,
-                                             label='Strip Trailing New Lines')}
-
 
 class WriteToText(PTransform):
   """A PTransform for writing to text files."""
@@ -302,16 +310,9 @@ class WriteToText(PTransform):
           compression.
     """
 
-    self._append_trailing_newlines = append_trailing_newlines
     self._sink = _TextSink(file_path_prefix, file_name_suffix,
                            append_trailing_newlines, num_shards,
                            shard_name_template, coder, compression_type)
 
   def expand(self, pcoll):
     return pcoll | Write(self._sink)
-
-  def display_data(self):
-    return {'sink_dd': self._sink,
-            'append_newline': DisplayDataItem(
-                self._append_trailing_newlines,
-                label='Append Trailing New Lines')}

http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py 
b/sdks/python/apache_beam/io/textio_test.py
index 4b85584..07c6d9c 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -25,8 +25,6 @@ import os
 import tempfile
 import unittest
 
-import hamcrest as hc
-
 import apache_beam as beam
 import apache_beam.io.source_test_utils as source_test_utils
 
@@ -45,9 +43,6 @@ from apache_beam.io.fileio import CompressionTypes
 
 from apache_beam.test_pipeline import TestPipeline
 
-from apache_beam.transforms.display import DisplayData
-from apache_beam.transforms.display_test import DisplayDataItemMatcher
-
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 
@@ -294,15 +289,6 @@ class TextSourceTest(unittest.TestCase):
         splits[0].source, splits[0].start_position, splits[0].stop_position,
         perform_multi_threaded_test=False)
 
-  def test_read_display_data(self):
-    read = ReadFromText('prefix', validate=False)
-    dd = DisplayData.create_from(read)
-    expected_items = [
-        DisplayDataItemMatcher('compression', 'auto'),
-        DisplayDataItemMatcher('file_pattern', 'prefix'),
-        DisplayDataItemMatcher('strip_newline', True)]
-    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
-
   def test_dataflow_single_file(self):
     file_name, expected_data = write_data(5)
     assert len(expected_data) == 5
@@ -506,22 +492,6 @@ class TextSinkTest(unittest.TestCase):
     with gzip.GzipFile(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), [])
 
-  def test_write_display_data(self):
-    write = WriteToText('prefix')
-    dd = DisplayData.create_from(write)
-    expected_items = [
-        DisplayDataItemMatcher(
-            'append_newline', True),
-        DisplayDataItemMatcher(
-            'compression', 'auto'),
-        DisplayDataItemMatcher(
-            'shards', 0),
-        DisplayDataItemMatcher(
-            'file_pattern',
-            '{}{}'.format('prefix',
-                          '-%(shard_num)05d-of-%(num_shards)05d'))]
-    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
-
   def test_write_dataflow(self):
     pipeline = TestPipeline()
     pcoll = pipeline | beam.core.Create(self.lines)

Reply via email to