Integrate bundle retry code for the DirectRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9a183f95 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9a183f95 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9a183f95 Branch: refs/heads/tez-runner Commit: 9a183f95d2a9f1d1dd4124da7d609b81a3b69d8e Parents: 2b4a6b5 Author: Maria Garcia Herrero <mari...@google.com> Authored: Fri Nov 10 14:50:16 2017 -0800 Committer: chamik...@google.com <chamik...@google.com> Committed: Mon Nov 13 11:36:27 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/options/pipeline_options.py | 7 ------- sdks/python/apache_beam/pipeline_test.py | 3 +-- sdks/python/apache_beam/runners/direct/executor.py | 14 +------------- 3 files changed, 2 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9a183f95/sdks/python/apache_beam/options/pipeline_options.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 5278b8a..aaac9a4 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -313,13 +313,6 @@ class DirectOptions(PipelineOptions): help='DirectRunner uses stacked WindowedValues within a Bundle for ' 'memory optimization. Set --no_direct_runner_use_stacked_bundle to ' 'avoid it.') - parser.add_argument( - '--direct_runner_bundle_retry', - action='store_true', - default=False, - help= - ('Whether to allow bundle retries. If True the maximum' - 'number of attempts to process a bundle is 4. ')) class GoogleCloudOptions(PipelineOptions): http://git-wip-us.apache.org/repos/asf/beam/blob/9a183f95/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 9bbb0d7..567ab92 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -506,8 +506,7 @@ class RunnerApiTest(unittest.TestCase): class DirectRunnerRetryTests(unittest.TestCase): def test_retry_fork_graph(self): - pipeline_options = PipelineOptions(['--direct_runner_bundle_retry']) - p = beam.Pipeline(options=pipeline_options) + p = beam.Pipeline() # TODO(mariagh): Remove the use of globals from the test. global count_b, count_c # pylint: disable=global-variable-undefined http://git-wip-us.apache.org/repos/asf/beam/blob/9a183f95/sdks/python/apache_beam/runners/direct/executor.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index 51fe908..853f19f 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -30,7 +30,6 @@ from weakref import WeakValueDictionary from apache_beam.metrics.execution import MetricsContainer from apache_beam.metrics.execution import ScopedMetricsContainer -from apache_beam.options.pipeline_options import DirectOptions class _ExecutorService(object): @@ -278,13 +277,7 @@ class TransformExecutor(_ExecutorService.CallableTask): self.blocked = False self._call_count = 0 self._retry_count = 0 - # Switch to turn on/off the retry of bundles. - pipeline_options = self._evaluation_context.pipeline_options - # TODO(mariagh): Remove once "bundle retry" is no longer experimental. - if not pipeline_options.view_as(DirectOptions).direct_runner_bundle_retry: - self._max_retries_per_bundle = 1 - else: - self._max_retries_per_bundle = TransformExecutor._MAX_RETRY_PER_BUNDLE + self._max_retries_per_bundle = TransformExecutor._MAX_RETRY_PER_BUNDLE def call(self): self._call_count += 1 @@ -319,11 +312,6 @@ class TransformExecutor(_ExecutorService.CallableTask): if self._retry_count == self._max_retries_per_bundle: logging.error('Giving up after %s attempts.', self._max_retries_per_bundle) - if self._retry_count == 1: - logging.info( - 'Use the experimental flag --direct_runner_bundle_retry' - ' to retry failed bundles (up to %d times).', - TransformExecutor._MAX_RETRY_PER_BUNDLE) self._completion_callback.handle_exception(self, e) self._evaluation_context.metrics().commit_physical(