Update refcounts after pipeline reconstruction.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/54360814 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/54360814 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/54360814 Branch: refs/heads/master Commit: 54360814fbc59f69e10f4bac52cfeeea5e044cf9 Parents: e2a2836 Author: Robert Bradshaw <rober...@gmail.com> Authored: Wed Apr 5 08:37:33 2017 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Wed Apr 5 09:54:48 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/pipeline.py | 1 + sdks/python/apache_beam/transforms/ptransform_test.py | 6 ++++++ 2 files changed, 7 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/54360814/sdks/python/apache_beam/pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 8506b85..fdb9a9d 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -546,4 +546,5 @@ class AppliedPTransform(object): if pc not in result.inputs: pc.producer = result pc.tag = tag + result.update_input_refcounts() return result http://git-wip-us.apache.org/repos/asf/beam/blob/54360814/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 37ff2a8..5889ab5 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -435,6 +435,12 @@ class PTransformTest(unittest.TestCase): assert_that(result, equal_to([])) pipeline.run() + def test_flatten_same_pcollections(self): + pipeline = TestPipeline() + pc = pipeline | beam.Create(['a', 'b']) + assert_that((pc, pc, pc) | beam.Flatten(), equal_to(['a', 'b'] * 3)) + pipeline.run() + def test_flatten_pcollections_in_iterable(self): pipeline = TestPipeline() pcoll_1 = pipeline | 'Start 1' >> beam.Create([0, 1, 2, 3])