DataflowRunner will raise an exception on failures.

This is the same behavior as before.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ed81a26
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ed81a26
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ed81a26

Branch: refs/heads/python-sdk
Commit: 1ed81a2655a2c98655d8e5ce965eb72681388926
Parents: aa3a2cb
Author: Ahmet Altay <al...@google.com>
Authored: Fri Jan 20 11:06:38 2017 -0800
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Fri Jan 20 16:46:21 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/runners/dataflow_runner.py      | 17 ++++--
 .../apache_beam/runners/dataflow_runner_test.py | 64 ++++++++++++++++++++
 2 files changed, 77 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1ed81a26/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow_runner.py
index fd22753..bd25dbf 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -151,7 +151,6 @@ class DataflowRunner(PipelineRunner):
         if not page_token:
           break
 
-    runner.result = DataflowPipelineResult(response, runner)
     runner.last_error_msg = last_error_msg
 
   def run(self, pipeline):
@@ -705,9 +704,11 @@ class DataflowPipelineResult(PipelineResult):
       while thread.isAlive():
         time.sleep(5.0)
       if self.state != PipelineState.DONE:
-        logging.error(
-            'Dataflow pipeline failed. State: %s, Error:\n%s',
-            self.state, getattr(self._runner, 'last_error_msg', None))
+        # TODO(BEAM-1290): Consider converting this to an error log based on 
the
+        # resolution of the issue.
+        raise DataflowRuntimeException(
+            'Dataflow pipeline failed. State: %s, Error:\n%s' %
+            (self.state, getattr(self._runner, 'last_error_msg', None)), self)
     return self.state
 
   def __str__(self):
@@ -718,3 +719,11 @@ class DataflowPipelineResult(PipelineResult):
 
   def __repr__(self):
     return '<%s %s at %s>' % (self.__class__.__name__, self._job, 
hex(id(self)))
+
+
+class DataflowRuntimeException(Exception):
+  """Indicates an error has occurred in running this pipeline."""
+
+  def __init__(self, msg, result):
+    super(DataflowRuntimeException, self).__init__(msg)
+    self.result = result

http://git-wip-us.apache.org/repos/asf/beam/blob/1ed81a26/sdks/python/apache_beam/runners/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner_test.py 
b/sdks/python/apache_beam/runners/dataflow_runner_test.py
new file mode 100644
index 0000000..a935c98
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow_runner_test.py
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the DataflowRunner class."""
+
+import unittest
+import mock
+
+from apache_beam.internal.clients import dataflow as dataflow_api
+from apache_beam.runners.dataflow_runner import DataflowRuntimeException
+from apache_beam.runners.dataflow_runner import DataflowPipelineResult
+
+
+class DataflowRunnerTest(unittest.TestCase):
+
+  @mock.patch('time.sleep', return_value=None)
+  def test_wait_until_finish(self, patched_time_sleep):
+    values_enum = dataflow_api.Job.CurrentStateValueValuesEnum
+
+    class MockDataflowRunner(object):
+
+      def __init__(self, final_state):
+        self.dataflow_client = mock.MagicMock()
+        self.job = mock.MagicMock()
+        self.job.currentState = values_enum.JOB_STATE_UNKNOWN
+
+        def get_job_side_effect(*args, **kwargs):
+          self.job.currentState = final_state
+          return mock.DEFAULT
+
+        self.dataflow_client.get_job = mock.MagicMock(
+            return_value=self.job, side_effect=get_job_side_effect)
+        self.dataflow_client.list_messages = mock.MagicMock(
+            return_value=([], None))
+
+    with self.assertRaises(DataflowRuntimeException) as e:
+      failed_runner = MockDataflowRunner(values_enum.JOB_STATE_FAILED)
+      failed_result = DataflowPipelineResult(failed_runner.job, failed_runner)
+      failed_result.wait_until_finish()
+    self.assertTrue(
+        'Dataflow pipeline failed. State: FAILED' in e.exception.message)
+
+    succeeded_runner = MockDataflowRunner(values_enum.JOB_STATE_DONE)
+    succeeded_result = DataflowPipelineResult(
+        succeeded_runner.job, succeeded_runner)
+    succeeded_result.wait_until_finish()
+
+
+if __name__ == '__main__':
+  unittest.main()

Reply via email to