This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new b458cfd [BEAM-7923] Include side effects in p.run new df482df Merge pull request #11141 from KevinGG/BEAM-7923-fix b458cfd is described below commit b458cfdc65086b7476f0c949a1389dccf8a681f1 Author: KevinGG <kawai...@gmail.com> AuthorDate: Mon Mar 16 15:03:20 2020 -0700 [BEAM-7923] Include side effects in p.run 1. PCollections never used as inputs and not watched, such as sinks without being assigned to variables will be pruned before `p.run()`. The change makes sure that these side effect PCollections are now considered as extended targets and will be executed on `p.run()`. 2. Note the change will not affect `show`, `head` and `collect` because they have an additional pipeline fragment logic that already prunes everything unrelated before the instrumenting and the prune logic inside instrumenting. --- .../runners/interactive/pipeline_instrument.py | 17 +++++++++++++++++ .../runners/interactive/pipeline_instrument_test.py | 12 ++++++++++++ 2 files changed, 29 insertions(+) diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py index b39fea4..b88517c 100644 --- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py +++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py @@ -403,6 +403,8 @@ class PipelineInstrument(object): self._pipeline """ cacheable_inputs = set() + all_inputs = set() + all_outputs = set() unbounded_source_pcolls = set() class InstrumentVisitor(PipelineVisitor): @@ -418,10 +420,16 @@ class PipelineInstrument(object): tuple(ie.current_env().options.capturable_sources)): unbounded_source_pcolls.update(transform_node.outputs.values()) cacheable_inputs.update(self._pin._cacheable_inputs(transform_node)) + ins, outs = self._pin._all_inputs_outputs(transform_node) + all_inputs.update(ins) + all_outputs.update(outs) v = InstrumentVisitor(self) self._pipeline.visit(v) + # Every output PCollection that is never used as an input PCollection is + # considered as a side effect of the pipeline run and should be included. + self._extended_targets.update(all_outputs.difference(all_inputs)) # Add the unbounded source pcollections to the cacheable inputs. This allows # for the caching of unbounded sources without a variable reference. cacheable_inputs.update(unbounded_source_pcolls) @@ -720,6 +728,15 @@ class PipelineInstrument(object): inputs.add(in_pcoll) return inputs + def _all_inputs_outputs(self, transform): + inputs = set() + outputs = set() + for in_pcoll in transform.inputs: + inputs.add(in_pcoll) + for _, out_pcoll in transform.outputs.items(): + outputs.add(out_pcoll) + return inputs, outputs + def _cacheable_key(self, pcoll): """Gets the key a cacheable PCollection is tracked within the instrument.""" return cacheable_key( diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py index 8fa9724..f10e98d 100644 --- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py +++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py @@ -798,6 +798,18 @@ class PipelineInstrumentTest(unittest.TestCase): assert_pipeline_proto_contain_top_level_transform( self, full_proto, 'Init Source') + def test_side_effect_pcoll_is_included(self): + pipeline_with_side_effect = beam.Pipeline( + interactive_runner.InteractiveRunner()) + # Deliberately not assign the result to a variable to make it a + # "side effect" transform. Note we never watch anything from + # the pipeline defined locally either. + # pylint: disable=range-builtin-not-iterating,expression-not-assigned + pipeline_with_side_effect | 'Init Create' >> beam.Create(range(10)) + pipeline_instrument = instr.build_pipeline_instrument( + pipeline_with_side_effect) + self.assertTrue(pipeline_instrument._extended_targets) + if __name__ == '__main__': unittest.main()