Repository: beam Updated Branches: refs/heads/master 847e4e9f0 -> c3b97a287
Update DataflowPipelineResult.state at the end of poll_for_job_completion. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/56512ab4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/56512ab4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/56512ab4 Branch: refs/heads/master Commit: 56512ab442c599c64bfdb9fc6cabce95d76ee4dc Parents: c03e6f3 Author: Ahmet Altay <al...@google.com> Authored: Fri Jan 20 23:43:42 2017 -0800 Committer: Ahmet Altay <al...@google.com> Committed: Fri Jan 20 23:43:42 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/runners/dataflow_runner.py | 6 ++++-- sdks/python/apache_beam/runners/dataflow_runner_test.py | 5 ++--- 2 files changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/56512ab4/sdks/python/apache_beam/runners/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index bd25dbf..31d3386 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -78,7 +78,7 @@ class DataflowRunner(PipelineRunner): return 's%s' % self._unique_step_id @staticmethod - def poll_for_job_completion(runner, job_id): + def poll_for_job_completion(runner, result): """Polls for the specified job to finish running (successfully or not).""" last_message_time = None last_message_id = None @@ -101,6 +101,7 @@ class DataflowRunner(PipelineRunner): else: return 0 + job_id = result.job_id() while True: response = runner.dataflow_client.get_job(job_id) # If get() is called very soon after Create() the response may not contain @@ -151,6 +152,7 @@ class DataflowRunner(PipelineRunner): if not page_token: break + result._job = response runner.last_error_msg = last_error_msg def run(self, pipeline): @@ -694,7 +696,7 @@ class DataflowPipelineResult(PipelineResult): thread = threading.Thread( target=DataflowRunner.poll_for_job_completion, - args=(self._runner, self.job_id())) + args=(self._runner, self)) # Mark the thread as a daemon thread so a keyboard interrupt on the main # thread will terminate everything. This is also the reason we will not http://git-wip-us.apache.org/repos/asf/beam/blob/56512ab4/sdks/python/apache_beam/runners/dataflow_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow_runner_test.py index a935c98..4983899 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow_runner_test.py @@ -47,12 +47,11 @@ class DataflowRunnerTest(unittest.TestCase): self.dataflow_client.list_messages = mock.MagicMock( return_value=([], None)) - with self.assertRaises(DataflowRuntimeException) as e: + with self.assertRaisesRegexp( + DataflowRuntimeException, 'Dataflow pipeline failed. State: FAILED'): failed_runner = MockDataflowRunner(values_enum.JOB_STATE_FAILED) failed_result = DataflowPipelineResult(failed_runner.job, failed_runner) failed_result.wait_until_finish() - self.assertTrue( - 'Dataflow pipeline failed. State: FAILED' in e.exception.message) succeeded_runner = MockDataflowRunner(values_enum.JOB_STATE_DONE) succeeded_result = DataflowPipelineResult(