DataflowRunner will raise an exception on failures. This is the same behavior as before.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ed81a26 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ed81a26 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ed81a26 Branch: refs/heads/python-sdk Commit: 1ed81a2655a2c98655d8e5ce965eb72681388926 Parents: aa3a2cb Author: Ahmet Altay <al...@google.com> Authored: Fri Jan 20 11:06:38 2017 -0800 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Fri Jan 20 16:46:21 2017 -0800 ---------------------------------------------------------------------- .../apache_beam/runners/dataflow_runner.py | 17 ++++-- .../apache_beam/runners/dataflow_runner_test.py | 64 ++++++++++++++++++++ 2 files changed, 77 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1ed81a26/sdks/python/apache_beam/runners/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index fd22753..bd25dbf 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -151,7 +151,6 @@ class DataflowRunner(PipelineRunner): if not page_token: break - runner.result = DataflowPipelineResult(response, runner) runner.last_error_msg = last_error_msg def run(self, pipeline): @@ -705,9 +704,11 @@ class DataflowPipelineResult(PipelineResult): while thread.isAlive(): time.sleep(5.0) if self.state != PipelineState.DONE: - logging.error( - 'Dataflow pipeline failed. State: %s, Error:\n%s', - self.state, getattr(self._runner, 'last_error_msg', None)) + # TODO(BEAM-1290): Consider converting this to an error log based on the + # resolution of the issue. + raise DataflowRuntimeException( + 'Dataflow pipeline failed. State: %s, Error:\n%s' % + (self.state, getattr(self._runner, 'last_error_msg', None)), self) return self.state def __str__(self): @@ -718,3 +719,11 @@ class DataflowPipelineResult(PipelineResult): def __repr__(self): return '<%s %s at %s>' % (self.__class__.__name__, self._job, hex(id(self))) + + +class DataflowRuntimeException(Exception): + """Indicates an error has occurred in running this pipeline.""" + + def __init__(self, msg, result): + super(DataflowRuntimeException, self).__init__(msg) + self.result = result http://git-wip-us.apache.org/repos/asf/beam/blob/1ed81a26/sdks/python/apache_beam/runners/dataflow_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow_runner_test.py new file mode 100644 index 0000000..a935c98 --- /dev/null +++ b/sdks/python/apache_beam/runners/dataflow_runner_test.py @@ -0,0 +1,64 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for the DataflowRunner class.""" + +import unittest +import mock + +from apache_beam.internal.clients import dataflow as dataflow_api +from apache_beam.runners.dataflow_runner import DataflowRuntimeException +from apache_beam.runners.dataflow_runner import DataflowPipelineResult + + +class DataflowRunnerTest(unittest.TestCase): + + @mock.patch('time.sleep', return_value=None) + def test_wait_until_finish(self, patched_time_sleep): + values_enum = dataflow_api.Job.CurrentStateValueValuesEnum + + class MockDataflowRunner(object): + + def __init__(self, final_state): + self.dataflow_client = mock.MagicMock() + self.job = mock.MagicMock() + self.job.currentState = values_enum.JOB_STATE_UNKNOWN + + def get_job_side_effect(*args, **kwargs): + self.job.currentState = final_state + return mock.DEFAULT + + self.dataflow_client.get_job = mock.MagicMock( + return_value=self.job, side_effect=get_job_side_effect) + self.dataflow_client.list_messages = mock.MagicMock( + return_value=([], None)) + + with self.assertRaises(DataflowRuntimeException) as e: + failed_runner = MockDataflowRunner(values_enum.JOB_STATE_FAILED) + failed_result = DataflowPipelineResult(failed_runner.job, failed_runner) + failed_result.wait_until_finish() + self.assertTrue( + 'Dataflow pipeline failed. State: FAILED' in e.exception.message) + + succeeded_runner = MockDataflowRunner(values_enum.JOB_STATE_DONE) + succeeded_result = DataflowPipelineResult( + succeeded_runner.job, succeeded_runner) + succeeded_result.wait_until_finish() + + +if __name__ == '__main__': + unittest.main()