Repository: incubator-beam Updated Branches: refs/heads/python-sdk b83f12b9f -> 3a0f01c8e
Fixes a couple of issues of FileBasedSource. (1) Updates code so that a user-specified coder properly gets set to sub-sources. (2) Currently each SingleFileSource takes a reference to FileBasedSource while FileBasedSource takes a reference to Concatsource. ConcatSource has a reference to list of SingleFileSources. This results in quadratic space complexity when serializing splits of a FileBasedSource. This CL fixes this issue by making sure that FileBasedSource is cloned before taking a reference to ConcatSource Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/93c5233a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/93c5233a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/93c5233a Branch: refs/heads/python-sdk Commit: 93c5233a1bf28e9b13412b909c2ee877bd6cf635 Parents: b83f12b Author: Chamikara Jayalath <chamik...@google.com> Authored: Thu Nov 17 19:18:26 2016 -0800 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Fri Nov 18 13:33:33 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/io/filebasedsource.py | 14 ++++++++++---- .../python/apache_beam/io/filebasedsource_test.py | 18 +++++++++++++++++- 2 files changed, 27 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93c5233a/sdks/python/apache_beam/io/filebasedsource.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index c7bc27e..7d8f686 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -109,6 +109,12 @@ class FileBasedSource(iobase.BoundedSource): file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)] sizes = FileBasedSource._estimate_sizes_in_parallel(file_names) + # We create a reference for FileBasedSource that will be serialized along + # with each _SingleFileSource. To prevent this FileBasedSource from having + # a reference to ConcatSource (resulting in quadratic space complexity) + # we clone it here. + file_based_source_ref = pickler.loads(pickler.dumps(self)) + for index, file_name in enumerate(file_names): if sizes[index] == 0: continue # Ignoring empty file. @@ -123,7 +129,7 @@ class FileBasedSource(iobase.BoundedSource): splittable = False single_file_source = _SingleFileSource( - self, file_name, + file_based_source_ref, file_name, 0, sizes[index], min_bundle_size=self._min_bundle_size, @@ -194,9 +200,6 @@ class FileBasedSource(iobase.BoundedSource): return self._get_concat_source().get_range_tracker(start_position, stop_position) - def default_output_coder(self): - return self._get_concat_source().default_output_coder() - def read_records(self, file_name, offset_range_tracker): """Returns a generator of records created by reading file 'file_name'. @@ -315,3 +318,6 @@ class _SingleFileSource(iobase.BoundedSource): def read(self, range_tracker): return self._file_based_source.read_records(self._file_name, range_tracker) + + def default_output_coder(self): + return self._file_based_source.default_output_coder() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93c5233a/sdks/python/apache_beam/io/filebasedsource_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py index 7f4d8d3..a455cd3 100644 --- a/sdks/python/apache_beam/io/filebasedsource_test.py +++ b/sdks/python/apache_beam/io/filebasedsource_test.py @@ -533,6 +533,23 @@ class TestFileBasedSource(unittest.TestCase): assert_that(pcoll, equal_to(lines)) pipeline.run() + def test_splits_get_coder_from_fbs(self): + class DummyCoder(object): + val = 12345 + + class FileBasedSourceWithCoder(LineSource): + + def default_output_coder(self): + return DummyCoder() + + pattern, expected_data = write_pattern([34, 66, 40, 24, 24, 12]) + self.assertEqual(200, len(expected_data)) + fbs = FileBasedSourceWithCoder(pattern) + splits = [split for split in fbs.split(desired_bundle_size=50)] + self.assertTrue(len(splits)) + for split in splits: + self.assertEqual(DummyCoder.val, split.source.default_output_coder().val) + class TestSingleFileSource(unittest.TestCase): @@ -685,7 +702,6 @@ class TestSingleFileSource(unittest.TestCase): read_data.extend(data_from_split) self.assertItemsEqual(expected_data[2:9], read_data) - if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main()