This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new d920644 [BEAM-3537] Allow more general eager in-process pipeline execution (#4492) d920644 is described below commit d9206441a16fa39e171684c43ec724cdd80a6ca1 Author: Charles Chen <charlesccyc...@users.noreply.github.com> AuthorDate: Mon Jan 29 14:43:32 2018 -0800 [BEAM-3537] Allow more general eager in-process pipeline execution (#4492) [BEAM-3537] Allow more general eager in-process pipeline execution This change also removes the Python DirectRunner-specific PValue cache. --- .../apache_beam/examples/snippets/snippets_test.py | 31 +++--- sdks/python/apache_beam/runners/__init__.py | 1 - .../apache_beam/runners/direct/direct_runner.py | 55 ---------- .../runners/direct/evaluation_context.py | 14 --- sdks/python/apache_beam/runners/direct/executor.py | 11 -- sdks/python/apache_beam/runners/runner.py | 2 +- sdks/python/apache_beam/transforms/ptransform.py | 119 ++++++++++++++++++--- .../apache_beam/transforms/ptransform_test.py | 16 ++- 8 files changed, 136 insertions(+), 113 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index f05dc39..e731236 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -223,24 +223,27 @@ class ParDoTest(unittest.TestCase): self.assertEqual({'xyz'}, set(marked)) def test_pardo_with_undeclared_outputs(self): - numbers = [1, 2, 3, 4, 5, 10, 20] + # Note: the use of undeclared outputs is currently not supported in eager + # execution mode. + with TestPipeline() as p: + numbers = p | beam.Create([1, 2, 3, 4, 5, 10, 20]) - # [START model_pardo_with_undeclared_outputs] - def even_odd(x): - yield pvalue.TaggedOutput('odd' if x % 2 else 'even', x) - if x % 10 == 0: - yield x + # [START model_pardo_with_undeclared_outputs] + def even_odd(x): + yield pvalue.TaggedOutput('odd' if x % 2 else 'even', x) + if x % 10 == 0: + yield x - results = numbers | beam.FlatMap(even_odd).with_outputs() + results = numbers | beam.FlatMap(even_odd).with_outputs() - evens = results.even - odds = results.odd - tens = results[None] # the undeclared main output - # [END model_pardo_with_undeclared_outputs] + evens = results.even + odds = results.odd + tens = results[None] # the undeclared main output + # [END model_pardo_with_undeclared_outputs] - self.assertEqual({2, 4, 10, 20}, set(evens)) - self.assertEqual({1, 3, 5}, set(odds)) - self.assertEqual({10, 20}, set(tens)) + assert_that(evens, equal_to([2, 4, 10, 20]), label='assert_even') + assert_that(odds, equal_to([1, 3, 5]), label='assert_odds') + assert_that(tens, equal_to([10, 20]), label='assert_tens') class TypeHintsTest(unittest.TestCase): diff --git a/sdks/python/apache_beam/runners/__init__.py b/sdks/python/apache_beam/runners/__init__.py index 2b93c30..863e67e 100644 --- a/sdks/python/apache_beam/runners/__init__.py +++ b/sdks/python/apache_beam/runners/__init__.py @@ -21,7 +21,6 @@ This package defines runners, which are used to execute a pipeline. """ from apache_beam.runners.direct.direct_runner import DirectRunner -from apache_beam.runners.direct.direct_runner import EagerRunner from apache_beam.runners.runner import PipelineRunner from apache_beam.runners.runner import PipelineState from apache_beam.runners.runner import create_runner diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 2bd6b45..b18d492 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -23,7 +23,6 @@ graph of transformations belonging to a pipeline on the local machine. from __future__ import absolute_import -import collections import logging from google.protobuf import wrappers_pb2 @@ -41,7 +40,6 @@ from apache_beam.runners.direct.clock import TestClock from apache_beam.runners.runner import PipelineResult from apache_beam.runners.runner import PipelineRunner from apache_beam.runners.runner import PipelineState -from apache_beam.runners.runner import PValueCache from apache_beam.transforms.core import _GroupAlsoByWindow from apache_beam.transforms.core import _GroupByKeyOnly from apache_beam.transforms.ptransform import PTransform @@ -106,7 +104,6 @@ class DirectRunner(PipelineRunner): """Executes a single pipeline on the local machine.""" def __init__(self): - self._cache = None self._use_test_clock = False # use RealClock() in production self._ptransform_overrides = _get_transform_overrides() @@ -229,8 +226,6 @@ class DirectRunner(PipelineRunner): self.consumer_tracking_visitor.views, clock) - evaluation_context.use_pvalue_cache(self._cache) - executor = Executor(self.consumer_tracking_visitor.value_to_consumers, TransformEvaluatorRegistry(evaluation_context), evaluation_context) @@ -242,53 +237,8 @@ class DirectRunner(PipelineRunner): executor.start(self.consumer_tracking_visitor.root_transforms) result = DirectPipelineResult(executor, evaluation_context) - if self._cache: - # We are running in eager mode, block until the pipeline execution - # completes in order to have full results in the cache. - result.wait_until_finish() - self._cache.finalize() - return result - @property - def cache(self): - if not self._cache: - self._cache = BufferingInMemoryCache() - return self._cache.pvalue_cache - - -class BufferingInMemoryCache(object): - """PValueCache wrapper for buffering bundles until a PValue is fully computed. - - BufferingInMemoryCache keeps an in memory cache of - (applied_ptransform, tag) tuples. It accepts appending to existing cache - entries until it is finalized. finalize() will make all the existing cached - entries visible to the underyling PValueCache in their entirety, clean the in - memory cache and stop accepting new cache entries. - """ - - def __init__(self): - self._cache = collections.defaultdict(list) - self._pvalue_cache = PValueCache() - self._finalized = False - - @property - def pvalue_cache(self): - return self._pvalue_cache - - def append(self, applied_ptransform, tag, elements): - assert not self._finalized - assert elements is not None - self._cache[(applied_ptransform, tag)].extend(elements) - - def finalize(self): - """Make buffered cache elements visible to the underlying PValueCache.""" - assert not self._finalized - for key, value in self._cache.iteritems(): - applied_ptransform, tag = key - self._pvalue_cache.cache_output(applied_ptransform, tag, value) - self._cache = None - class DirectPipelineResult(PipelineResult): """A DirectPipelineResult provides access to info about a pipeline.""" @@ -329,8 +279,3 @@ class DirectPipelineResult(PipelineResult): def metrics(self): return self._evaluation_context.metrics() - - -class EagerRunner(DirectRunner): - - is_eager = True diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py index 718dafa..46176c9 100644 --- a/sdks/python/apache_beam/runners/direct/evaluation_context.py +++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py @@ -155,7 +155,6 @@ class EvaluationContext(object): self._side_inputs_container = _SideInputsContainer(views) self._pending_unblocked_tasks = [] self._counter_factory = counters.CounterFactory() - self._cache = None self._metrics = DirectMetrics() self._lock = threading.Lock() @@ -169,23 +168,10 @@ class EvaluationContext(object): transform_keyed_states[consumer] = {} return transform_keyed_states - def use_pvalue_cache(self, cache): - assert not self._cache - self._cache = cache - def metrics(self): # TODO. Should this be made a @property? return self._metrics - @property - def has_cache(self): - return self._cache is not None - - def append_to_cache(self, applied_ptransform, tag, elements): - with self._lock: - assert self._cache - self._cache.append(applied_ptransform, tag, elements) - def is_root_transform(self, applied_ptransform): return applied_ptransform in self._root_transforms diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index 1cbabc4..d4d9cb5 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -341,17 +341,6 @@ class TransformExecutor(_ExecutorService.CallableTask): result = evaluator.finish_bundle() result.logical_metric_updates = metrics_container.get_cumulative() - if self._evaluation_context.has_cache: - for uncommitted_bundle in result.uncommitted_output_bundles: - self._evaluation_context.append_to_cache( - self._applied_ptransform, uncommitted_bundle.tag, - uncommitted_bundle.get_elements_iterable()) - undeclared_tag_values = result.undeclared_tag_values - if undeclared_tag_values: - for tag, value in undeclared_tag_values.iteritems(): - self._evaluation_context.append_to_cache( - self._applied_ptransform, tag, value) - self._completion_callback.handle_result(self, self._input_bundle, result) return result diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index 78ae4d8..1d0f700 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -45,7 +45,7 @@ _PYTHON_RPC_DIRECT_RUNNER = ( 'python_rpc_direct_runner.') _KNOWN_PYTHON_RPC_DIRECT_RUNNER = ('PythonRPCDirectRunner',) -_KNOWN_DIRECT_RUNNERS = ('DirectRunner', 'EagerRunner') +_KNOWN_DIRECT_RUNNERS = ('DirectRunner',) _KNOWN_DATAFLOW_RUNNERS = ('DataflowRunner',) _KNOWN_TEST_RUNNERS = ('TestDataflowRunner',) diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 0b6d608..2490495 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -38,9 +38,11 @@ from __future__ import absolute_import import copy import inspect +import itertools import operator import os import sys +import threading from functools import reduce from google.protobuf import wrappers_pb2 @@ -97,29 +99,116 @@ class _SetInputPValues(_PValueishTransform): return self.visit_nested(node, replacements) +# Caches to allow for materialization of values when executing a pipeline +# in-process, in eager mode. This cache allows the same _MaterializedResult +# object to be accessed and used despite Runner API round-trip serialization. +_pipeline_materialization_cache = {} +_pipeline_materialization_lock = threading.Lock() + + +def _allocate_materialized_pipeline(pipeline): + pid = os.getpid() + with _pipeline_materialization_lock: + pipeline_id = id(pipeline) + _pipeline_materialization_cache[(pid, pipeline_id)] = {} + + +def _allocate_materialized_result(pipeline): + pid = os.getpid() + with _pipeline_materialization_lock: + pipeline_id = id(pipeline) + if (pid, pipeline_id) not in _pipeline_materialization_cache: + raise ValueError('Materialized pipeline is not allocated for result ' + 'cache.') + result_id = len(_pipeline_materialization_cache[(pid, pipeline_id)]) + result = _MaterializedResult(pipeline_id, result_id) + _pipeline_materialization_cache[(pid, pipeline_id)][result_id] = result + return result + + +def _get_materialized_result(pipeline_id, result_id): + pid = os.getpid() + with _pipeline_materialization_lock: + if (pid, pipeline_id) not in _pipeline_materialization_cache: + raise Exception( + 'Materialization in out-of-process and remote runners is not yet ' + 'supported.') + return _pipeline_materialization_cache[(pid, pipeline_id)][result_id] + + +def _release_materialized_pipeline(pipeline): + pid = os.getpid() + with _pipeline_materialization_lock: + pipeline_id = id(pipeline) + del _pipeline_materialization_cache[(pid, pipeline_id)] + + +class _MaterializedResult(object): + def __init__(self, pipeline_id, result_id): + self._pipeline_id = pipeline_id + self._result_id = result_id + self.elements = [] + + def __reduce__(self): + # When unpickled (during Runner API roundtrip serailization), get the + # _MaterializedResult object from the cache so that values are written + # to the original _MaterializedResult when run in eager mode. + return (_get_materialized_result, (self._pipeline_id, self._result_id)) + + class _MaterializedDoOutputsTuple(pvalue.DoOutputsTuple): - def __init__(self, deferred, pvalue_cache): + def __init__(self, deferred, results_by_tag): super(_MaterializedDoOutputsTuple, self).__init__( None, None, deferred._tags, deferred._main_tag) self._deferred = deferred - self._pvalue_cache = pvalue_cache + self._results_by_tag = results_by_tag def __getitem__(self, tag): - # Simply accessing the value should not use it up. - return self._pvalue_cache.get_unwindowed_pvalue( - self._deferred[tag], decref=False) + if tag not in self._results_by_tag: + raise KeyError( + 'Tag %r is not a a defined output tag of %s.' % ( + tag, self._deferred)) + return self._results_by_tag[tag].elements + + +class _AddMaterializationTransforms(_PValueishTransform): + def _materialize_transform(self, pipeline): + result = _allocate_materialized_result(pipeline) -class _MaterializePValues(_PValueishTransform): - def __init__(self, pvalue_cache): - self._pvalue_cache = pvalue_cache + # Need to define _MaterializeValuesDoFn here to avoid circular + # dependencies. + from apache_beam import DoFn + from apache_beam import ParDo + + class _MaterializeValuesDoFn(DoFn): + def process(self, element): + result.elements.append(element) + + materialization_label = '_MaterializeValues%d' % result._result_id + return (materialization_label >> ParDo(_MaterializeValuesDoFn()), + result) def visit(self, node): if isinstance(node, pvalue.PValue): - # Simply accessing the value should not use it up. - return self._pvalue_cache.get_unwindowed_pvalue(node, decref=False) + transform, result = self._materialize_transform(node.pipeline) + node | transform + return result elif isinstance(node, pvalue.DoOutputsTuple): - return _MaterializedDoOutputsTuple(node, self._pvalue_cache) + results_by_tag = {} + for tag in itertools.chain([node._main_tag], node._tags): + results_by_tag[tag] = self.visit(node[tag]) + return _MaterializedDoOutputsTuple(node, results_by_tag) + else: + return self.visit_nested(node) + + +class _FinalizeMaterialization(_PValueishTransform): + def visit(self, node): + if isinstance(node, _MaterializedResult): + return node.elements + elif isinstance(node, _MaterializedDoOutputsTuple): + return node else: return self.visit_nested(node) @@ -399,11 +488,11 @@ class PTransform(WithTypeHints, HasDisplayData): result = p.apply(self, pvalueish, label) if deferred: return result - # Get a reference to the runners internal cache, otherwise runner may - # clean it after run. - cache = p.runner.cache + _allocate_materialized_pipeline(p) + materialized_result = _AddMaterializationTransforms().visit(result) p.run().wait_until_finish() - return _MaterializePValues(cache).visit(result) + _release_materialized_pipeline(p) + return _FinalizeMaterialization().visit(materialized_result) def _extract_input_pvalues(self, pvalueish): """Extract all the pvalues contained in the input pvalueish. diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 09ac72b..1f01c9c 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -1407,7 +1407,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): "Valid object instance must be of type 'tuple'. Instead, " "an instance of 'float' was received.") - def test_pipline_runtime_checking_violation_with_side_inputs_decorator(self): + def test_pipeline_runtime_checking_violation_with_side_inputs_decorator(self): self.p._options.view_as(TypeOptions).pipeline_type_check = False self.p._options.view_as(TypeOptions).runtime_type_check = True @@ -1427,7 +1427,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): "Expected an instance of <type 'int'>, " "instead found 1.0, an instance of <type 'float'>.") - def test_pipline_runtime_checking_violation_with_side_inputs_via_method(self): + def test_pipeline_runtime_checking_violation_with_side_inputs_via_method(self): # pylint: disable=line-too-long self.p._options.view_as(TypeOptions).runtime_type_check = True self.p._options.view_as(TypeOptions).pipeline_type_check = False @@ -2062,6 +2062,18 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): x = self.p | 'C2' >> beam.Create([1, 2, 3, 4]) self.assertEqual(int, x.element_type) + def test_eager_execution(self): + doubled = [1, 2, 3, 4] | beam.Map(lambda x: 2 * x) + self.assertEqual([2, 4, 6, 8], doubled) + + def test_eager_execution_tagged_outputs(self): + result = [1, 2, 3, 4] | beam.Map( + lambda x: pvalue.TaggedOutput('bar', 2 * x)).with_outputs('bar') + self.assertEqual([2, 4, 6, 8], result.bar) + with self.assertRaises(KeyError, + msg='Tag \'foo\' is not a defined output tag'): + result.foo + if __name__ == '__main__': unittest.main() -- To stop receiving notification emails like this one, please contact rober...@apache.org.