Repository: beam Updated Branches: refs/heads/master 1974b920e -> 3360b1f68
Pin runner harness also for official BEAM releases. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7155931f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7155931f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7155931f Branch: refs/heads/master Commit: 7155931ff9eaf5fb85765e9d515469f5e6bd5bf9 Parents: 1974b92 Author: Valentyn Tymofieiev <valen...@google.com> Authored: Wed Oct 18 14:25:33 2017 -0700 Committer: Valentyn Tymofieiev <valen...@google.com> Committed: Wed Oct 18 14:39:44 2017 -0700 ---------------------------------------------------------------------- .../runners/dataflow/internal/apiclient_test.py | 30 +++++++++++++++++--- .../runners/dataflow/internal/dependency.py | 7 +++++ 2 files changed, 33 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7155931f/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index fa4f89a..ecd6003 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -156,10 +156,14 @@ class UtilTest(unittest.TestCase): 'apache_beam.runners.dataflow.internal.dependency.pkg_resources' '.get_distribution', mock.MagicMock(return_value=distribution)): - env = apiclient.Environment([], pipeline_options, '2.2.0') + env = apiclient.Environment([], #packages + pipeline_options, + '2.0.0') #any environment version self.assertIn(override, env.proto.experiments) - def test_harness_override_absent_in_unreleased_sdk(self): + @mock.patch('apache_beam.runners.dataflow.internal.dependency.' + 'beam_version.__version__', '2.2.0') + def test_harness_override_present_in_beam_releases(self): pipeline_options = PipelineOptions( ['--temp_location', 'gs://any-location/temp', '--streaming']) override = ''.join( @@ -170,8 +174,26 @@ class UtilTest(unittest.TestCase): 'apache_beam.runners.dataflow.internal.dependency.pkg_resources' '.get_distribution', mock.Mock(side_effect=pkg_resources.DistributionNotFound())): - env = apiclient.Environment([], pipeline_options, '2.2.0') - self.assertNotIn(override, env.proto.experiments) + env = apiclient.Environment([], #packages + pipeline_options, + '2.0.0') #any environment version + self.assertIn(override, env.proto.experiments) + + @mock.patch('apache_beam.runners.dataflow.internal.dependency.' + 'beam_version.__version__', '2.2.0-dev') + def test_harness_override_absent_in_unreleased_sdk(self): + pipeline_options = PipelineOptions( + ['--temp_location', 'gs://any-location/temp', '--streaming']) + with mock.patch( + 'apache_beam.runners.dataflow.internal.dependency.pkg_resources' + '.get_distribution', + mock.Mock(side_effect=pkg_resources.DistributionNotFound())): + env = apiclient.Environment([], #packages + pipeline_options, + '2.0.0') #any environment version + if env.proto.experiments: + for experiment in env.proto.experiments: + self.assertNotIn('runner_harness_container_image=', experiment) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/7155931f/sdks/python/apache_beam/runners/dataflow/internal/dependency.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py index 123fc49..c1edf7d 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py @@ -500,9 +500,16 @@ def get_runner_harness_container_image(): """ try: version = pkg_resources.get_distribution(GOOGLE_PACKAGE_NAME).version + # Pin runner harness for Dataflow releases. return (DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/' + 'harness' + ':' + version) except pkg_resources.DistributionNotFound: + # Pin runner harness for BEAM releases. + if 'dev' not in beam_version.__version__: + return (DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/' + 'harness' + ':' + + beam_version.__version__) + # Don't pin runner harness for BEAM head so that we can notice + # potential incompatibility between runner and sdk harnesses. return None