Repository: beam Updated Branches: refs/heads/master 84a23793c -> b0b642182
Add initial bundle retry code Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f2dddda Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f2dddda Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f2dddda Branch: refs/heads/master Commit: 1f2ddddabf541b88f01b17aa9a549081a8607bb9 Parents: 84a2379 Author: Maria Garcia Herrero <mari...@google.com> Authored: Thu Aug 3 00:16:54 2017 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Sat Aug 12 18:06:08 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/options/pipeline_options.py | 7 ++ sdks/python/apache_beam/pipeline_test.py | 30 ++++++ .../apache_beam/runners/direct/executor.py | 100 ++++++++++++------- 3 files changed, 100 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1f2dddda/sdks/python/apache_beam/options/pipeline_options.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index ea996a3..db65b3c 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -314,6 +314,13 @@ class DirectOptions(PipelineOptions): help='DirectRunner uses stacked WindowedValues within a Bundle for ' 'memory optimization. Set --no_direct_runner_use_stacked_bundle to ' 'avoid it.') + parser.add_argument( + '--direct_runner_bundle_retry', + action='store_true', + default=False, + help= + ('Whether to allow bundle retries. If True the maximum' + 'number of attempts to process a bundle is 4. ')) class GoogleCloudOptions(PipelineOptions): http://git-wip-us.apache.org/repos/asf/beam/blob/1f2dddda/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index aad0143..b3ac100 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -499,6 +499,36 @@ class RunnerApiTest(unittest.TestCase): self.assertEqual(MyPTransform.pickle_count[0], 20) +class DirectRunnerRetryTests(unittest.TestCase): + + def test_retry_fork_graph(self): + pipeline_options = PipelineOptions(['--direct_runner_bundle_retry']) + p = beam.Pipeline(options=pipeline_options) + + # TODO(mariagh): Remove the use of globals from the test. + global count_b, count_c # pylint: disable=global-variable-undefined + count_b, count_c = 0, 0 + + def f_b(x): + global count_b # pylint: disable=global-variable-undefined + count_b += 1 + raise Exception('exception in f_b') + + def f_c(x): + global count_c # pylint: disable=global-variable-undefined + count_c += 1 + raise Exception('exception in f_c') + + names = p | 'CreateNodeA' >> beam.Create(['Ann', 'Joe']) + + fork_b = names | 'SendToB' >> beam.Map(f_b) # pylint: disable=unused-variable + fork_c = names | 'SendToC' >> beam.Map(f_c) # pylint: disable=unused-variable + + with self.assertRaises(Exception): + p.run().wait_until_finish() + assert count_b == count_c == 4 + + if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG) unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/1f2dddda/sdks/python/apache_beam/runners/direct/executor.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index e70e326..2e46978 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -25,10 +25,12 @@ import logging import Queue import sys import threading +import traceback from weakref import WeakValueDictionary from apache_beam.metrics.execution import MetricsContainer from apache_beam.metrics.execution import ScopedMetricsContainer +from apache_beam.options.pipeline_options import DirectOptions class _ExecutorService(object): @@ -271,6 +273,15 @@ class TransformExecutor(_ExecutorService.CallableTask): self._side_input_values = {} self.blocked = False self._call_count = 0 + self._retry_count = 0 + # Switch to turn on/off the retry of bundles. + pipeline_options = self._evaluation_context.pipeline_options + if not pipeline_options.view_as(DirectOptions).direct_runner_bundle_retry: + self._max_retries_per_bundle = 1 + else: + self._max_retries_per_bundle = 4 + # TODO(mariagh): make _max_retries_per_bundle a constant + # once "bundle retry" is no longer experimental. def call(self): self._call_count += 1 @@ -288,47 +299,62 @@ class TransformExecutor(_ExecutorService.CallableTask): # available. return self._side_input_values[side_input] = value - side_input_values = [self._side_input_values[side_input] for side_input in self._applied_ptransform.side_inputs] - try: - evaluator = self._transform_evaluator_registry.get_evaluator( - self._applied_ptransform, self._input_bundle, - side_input_values, scoped_metrics_container) - - if self._fired_timers: - for timer_firing in self._fired_timers: - evaluator.process_timer_wrapper(timer_firing) - - if self._input_bundle: - for value in self._input_bundle.get_elements_iterable(): - evaluator.process_element(value) - - with scoped_metrics_container: - result = evaluator.finish_bundle() - result.logical_metric_updates = metrics_container.get_cumulative() - - if self._evaluation_context.has_cache: - for uncommitted_bundle in result.uncommitted_output_bundles: + while self._retry_count < self._max_retries_per_bundle: + try: + self.attempt_call(metrics_container, + scoped_metrics_container, + side_input_values) + break + except Exception as e: + self._retry_count += 1 + logging.info( + 'Exception at bundle %r, due to an exception: %s', + self._input_bundle, traceback.format_exc()) + if self._retry_count == self._max_retries_per_bundle: + logging.error('Giving up after %s attempts.', + self._max_retries_per_bundle) + self._completion_callback.handle_exception(self, e) + + self._evaluation_context.metrics().commit_physical( + self._input_bundle, + metrics_container.get_cumulative()) + self._transform_evaluation_state.complete(self) + + def attempt_call(self, metrics_container, + scoped_metrics_container, + side_input_values): + evaluator = self._transform_evaluator_registry.get_evaluator( + self._applied_ptransform, self._input_bundle, + side_input_values, scoped_metrics_container) + + if self._fired_timers: + for timer_firing in self._fired_timers: + evaluator.process_timer_wrapper(timer_firing) + + if self._input_bundle: + for value in self._input_bundle.get_elements_iterable(): + evaluator.process_element(value) + + with scoped_metrics_container: + result = evaluator.finish_bundle() + result.logical_metric_updates = metrics_container.get_cumulative() + + if self._evaluation_context.has_cache: + for uncommitted_bundle in result.uncommitted_output_bundles: + self._evaluation_context.append_to_cache( + self._applied_ptransform, uncommitted_bundle.tag, + uncommitted_bundle.get_elements_iterable()) + undeclared_tag_values = result.undeclared_tag_values + if undeclared_tag_values: + for tag, value in undeclared_tag_values.iteritems(): self._evaluation_context.append_to_cache( - self._applied_ptransform, uncommitted_bundle.tag, - uncommitted_bundle.get_elements_iterable()) - undeclared_tag_values = result.undeclared_tag_values - if undeclared_tag_values: - for tag, value in undeclared_tag_values.iteritems(): - self._evaluation_context.append_to_cache( - self._applied_ptransform, tag, value) - - self._completion_callback.handle_result(self, self._input_bundle, result) - return result - except Exception as e: # pylint: disable=broad-except - self._completion_callback.handle_exception(self, e) - finally: - self._evaluation_context.metrics().commit_physical( - self._input_bundle, - metrics_container.get_cumulative()) - self._transform_evaluation_state.complete(self) + self._applied_ptransform, tag, value) + + self._completion_callback.handle_result(self, self._input_bundle, result) + return result class Executor(object):