Change side inputs to be references rather than full PValues. This is more consistent with the Runner API's structure.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/207de81b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/207de81b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/207de81b Branch: refs/heads/master Commit: 207de81bca4c3761cf663d32f9b95a022ef97165 Parents: 132d3c5 Author: Robert Bradshaw <rober...@gmail.com> Authored: Thu Mar 30 08:20:21 2017 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Fri Mar 31 12:11:00 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/pipeline.py | 12 +- sdks/python/apache_beam/pvalue.py | 258 +++++++------------ sdks/python/apache_beam/pvalue_test.py | 33 --- .../runners/dataflow/dataflow_runner.py | 29 ++- .../runners/direct/bundle_factory.py | 3 +- .../consumer_tracking_pipeline_visitor.py | 11 +- .../consumer_tracking_pipeline_visitor_test.py | 4 +- .../runners/direct/evaluation_context.py | 60 +++-- .../apache_beam/runners/direct/executor.py | 7 +- .../runners/direct/transform_evaluator.py | 51 +--- sdks/python/apache_beam/transforms/core.py | 2 +- .../python/apache_beam/transforms/ptransform.py | 4 +- .../python/apache_beam/transforms/sideinputs.py | 132 ---------- .../apache_beam/transforms/sideinputs_test.py | 91 +++---- 14 files changed, 198 insertions(+), 499 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index be2a79d..ee5904b 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -142,9 +142,6 @@ class Pipeline(object): # If a transform is applied and the full label is already in the set # then the transform will have to be cloned with a new label. self.applied_labels = set() - # Store cache of views created from PCollections. For reference, see - # pvalue._cache_view(). - self._view_cache = {} def _current_transform(self): """Returns the transform currently on the top of the stack.""" @@ -271,8 +268,8 @@ class Pipeline(object): result.producer = current # TODO(robertwb): Multi-input, multi-output inference. # TODO(robertwb): Ideally we'd do intersection here. - if (type_options is not None and type_options.pipeline_type_check and - isinstance(result, (pvalue.PCollection, pvalue.PCollectionView)) + if (type_options is not None and type_options.pipeline_type_check + and isinstance(result, pvalue.PCollection) and not result.element_type): input_element_type = ( inputs[0].element_type @@ -416,7 +413,7 @@ class AppliedPTransform(object): if not isinstance(main_input, pvalue.PBegin): real_producer(main_input).refcounts[main_input.tag] += 1 for side_input in self.side_inputs: - real_producer(side_input).refcounts[side_input.tag] += 1 + real_producer(side_input.pvalue).refcounts[side_input.pvalue.tag] += 1 def add_output(self, output, tag=None): if isinstance(output, pvalue.DoOutputsTuple): @@ -456,7 +453,8 @@ class AppliedPTransform(object): # Visit side inputs. for pval in self.side_inputs: - if isinstance(pval, pvalue.PCollectionView) and pval not in visited: + if isinstance(pval, pvalue.AsSideInput) and pval.pvalue not in visited: + pval = pval.pvalue # Unpack marker-object-wrapped pvalue. assert pval.producer is not None pval.producer.visit(visitor, pipeline, visited) # The value should be visited now since we visit outputs too. http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/pvalue.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 4114b3f..bfe1745 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -26,9 +26,10 @@ produced when the pipeline gets executed. from __future__ import absolute_import -import collections import itertools +from apache_beam import typehints + class PValue(object): """Base class for PCollection. @@ -250,20 +251,22 @@ class SideOutputValue(object): self.value = value -class PCollectionView(PValue): - """An immutable view of a PCollection that can be used as a side input.""" +class AsSideInput(object): + """Marker specifying that a PCollection will be used as a side input. - def __init__(self, pipeline, window_mapping_fn): - """Initializes a PCollectionView. Do not call directly.""" - super(PCollectionView, self).__init__(pipeline) - self._window_mapping_fn = window_mapping_fn + When a PCollection is supplied as a side input to a PTransform, it is + necessary to indicate how the PCollection should be made available + as a PTransform side argument (e.g. in the form of an iterable, mapping, + or single value). This class is the superclass of all the various + options, and should not be instantiated directly. (See instead AsSingleton, + AsIter, etc.) + """ - @property - def windowing(self): - if not hasattr(self, '_windowing'): - self._windowing = self.producer.transform.get_windowing( - self.producer.inputs) - return self._windowing + def __init__(self, pcoll): + from apache_beam.transforms import sideinputs + self.pvalue = pcoll + self._window_mapping_fn = sideinputs.default_window_mapping_fn( + pcoll.windowing.windowfn) def _view_options(self): """Internal options corresponding to specific view. @@ -275,19 +278,39 @@ class PCollectionView(PValue): """ return {'window_mapping_fn': self._window_mapping_fn} + @property + def element_type(self): + return typehints.Any + + +class AsSingleton(AsSideInput): + """Marker specifying that an entire PCollection is to be used as a side input. + + When a PCollection is supplied as a side input to a PTransform, it is + necessary to indicate whether the entire PCollection should be made available + as a PTransform side argument (in the form of an iterable), or whether just + one value should be pulled from the PCollection and supplied as the side + argument (as an ordinary value). -class SingletonPCollectionView(PCollectionView): - """A PCollectionView that contains a single object.""" + Wrapping a PCollection side input argument to a PTransform in this container + (e.g., data.apply('label', MyPTransform(), AsSingleton(my_side_input) ) + selects the latter behavor. - def __init__(self, pipeline, has_default, default_value, - window_mapping_fn): - super(SingletonPCollectionView, self).__init__(pipeline, window_mapping_fn) - self.has_default = has_default + The input PCollection must contain exactly one value per window, unless a + default is given, in which case it may be empty. + """ + _NO_DEFAULT = object() + + def __init__(self, pcoll, default_value=_NO_DEFAULT): + super(AsSingleton, self).__init__(pcoll) self.default_value = default_value + def __repr__(self): + return 'AsSingleton(%s)' % self.pvalue + def _view_options(self): - base = super(SingletonPCollectionView, self)._view_options() - if self.has_default: + base = super(AsSingleton, self)._view_options() + if self.default_value != AsSingleton._NO_DEFAULT: return dict(base, default=self.default_value) else: return base @@ -304,182 +327,83 @@ class SingletonPCollectionView(PCollectionView): 'PCollection with more than one element accessed as ' 'a singleton view.') - -class IterablePCollectionView(PCollectionView): - """A PCollectionView that can be treated as an iterable.""" - - @staticmethod - def _from_runtime_iterable(it, options): - return it + @property + def element_type(self): + return self.pvalue.element_type -class ListPCollectionView(PCollectionView): - """A PCollectionView that can be treated as a list.""" +class AsIter(AsSideInput): + """Marker specifying that an entire PCollection is to be used as a side input. - @staticmethod - def _from_runtime_iterable(it, options): - return list(it) + When a PCollection is supplied as a side input to a PTransform, it is + necessary to indicate whether the entire PCollection should be made available + as a PTransform side argument (in the form of an iterable), or whether just + one value should be pulled from the PCollection and supplied as the side + argument (as an ordinary value). + Wrapping a PCollection side input argument to a PTransform in this container + (e.g., data.apply('label', MyPTransform(), AsIter(my_side_input) ) selects the + former behavor. + """ -class DictPCollectionView(PCollectionView): - """A PCollectionView that can be treated as a dict.""" + def __repr__(self): + return 'AsIter(%s)' % self.pvalue @staticmethod def _from_runtime_iterable(it, options): - return dict(it) - - -def _get_cached_view(pipeline, key): - return pipeline._view_cache.get(key, None) # pylint: disable=protected-access - - -def _cache_view(pipeline, key, view): - pipeline._view_cache[key] = view # pylint: disable=protected-access - - -def _format_view_label(pcoll): - # The monitoring UI doesn't like '/' character in transform labels. - if not pcoll.producer: - return str(pcoll.tag) - return '%s.%s' % (pcoll.producer.full_label.replace('/', '|'), - pcoll.tag) - - -_SINGLETON_NO_DEFAULT = object() - + return it -def AsSingleton(pcoll, default_value=_SINGLETON_NO_DEFAULT, label=None): # pylint: disable=invalid-name - """Create a SingletonPCollectionView from the contents of input PCollection. + @property + def element_type(self): + return typehints.Iterable[self.pvalue.element_type] - The input PCollection should contain at most one element (per window) and the - resulting PCollectionView can then be used as a side input to PTransforms. If - the PCollectionView is empty (for a given window), the side input value will - be the default_value, if specified; otherwise, it will be an EmptySideInput - object. - Args: - pcoll: Input pcollection. - default_value: Default value for the singleton view. - label: Label to be specified if several AsSingleton's with different - defaults for the same PCollection. +class AsList(AsSideInput): + """Marker specifying that an entire PCollection is to be used as a side input. - Returns: - A singleton PCollectionView containing the element as above. - """ - label = label or _format_view_label(pcoll) - has_default = default_value is not _SINGLETON_NO_DEFAULT - if not has_default: - default_value = None - - # Don't recreate the view if it was already created. - hashable_default_value = ('val', default_value) - if not isinstance(default_value, collections.Hashable): - # Massage default value to treat as hash key. - hashable_default_value = ('id', id(default_value)) - cache_key = (pcoll, AsSingleton, has_default, hashable_default_value) - cached_view = _get_cached_view(pcoll.pipeline, cache_key) - if cached_view: - return cached_view - - # Local import is required due to dependency loop; even though the - # implementation of this function requires concepts defined in modules that - # depend on pvalue, it lives in this module to reduce user workload. - from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order, wrong-import-position - view = (pcoll | sideinputs.ViewAsSingleton(has_default, default_value, - label=label)) - _cache_view(pcoll.pipeline, cache_key, view) - return view - - -def AsIter(pcoll, label=None): # pylint: disable=invalid-name - """Create an IterablePCollectionView from the elements of input PCollection. - - The contents of the given PCollection will be available as an iterable in - PTransforms that use the returned PCollectionView as a side input. + Intended for use in side-argument specification---the same places where + AsSingleton and AsIter are used, but forces materialization of this + PCollection as a list. Args: pcoll: Input pcollection. - label: Label to be specified if several AsIter's for the same PCollection. Returns: - An iterable PCollectionView containing the elements as above. + An AsList-wrapper around a PCollection whose one element is a list + containing all elements in pcoll. """ - label = label or _format_view_label(pcoll) - - # Don't recreate the view if it was already created. - cache_key = (pcoll, AsIter) - cached_view = _get_cached_view(pcoll.pipeline, cache_key) - if cached_view: - return cached_view - # Local import is required due to dependency loop; even though the - # implementation of this function requires concepts defined in modules that - # depend on pvalue, it lives in this module to reduce user workload. - from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order, wrong-import-position - view = (pcoll | sideinputs.ViewAsIterable(label=label)) - _cache_view(pcoll.pipeline, cache_key, view) - return view + @staticmethod + def _from_runtime_iterable(it, options): + return list(it) -def AsList(pcoll, label=None): # pylint: disable=invalid-name - """Create a ListPCollectionView from the elements of input PCollection. +class AsDict(AsSideInput): + """Marker specifying a PCollection to be used as an indexable side input. - The contents of the given PCollection will be available as a list-like object - in PTransforms that use the returned PCollectionView as a side input. + Intended for use in side-argument specification---the same places where + AsSingleton and AsIter are used, but returns an interface that allows + key lookup. Args: - pcoll: Input pcollection. - label: Label to be specified if several AsList's for the same PCollection. + pcoll: Input pcollection. All elements should be key-value pairs (i.e. + 2-tuples) with unique keys. Returns: - A list PCollectionView containing the elements as above. + An AsDict-wrapper around a PCollection whose one element is a dict with + entries for uniquely-keyed pairs in pcoll. """ - label = label or _format_view_label(pcoll) - - # Don't recreate the view if it was already created. - cache_key = (pcoll, AsList) - cached_view = _get_cached_view(pcoll.pipeline, cache_key) - if cached_view: - return cached_view - - # Local import is required due to dependency loop; even though the - # implementation of this function requires concepts defined in modules that - # depend on pvalue, it lives in this module to reduce user workload. - from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order, wrong-import-position - view = (pcoll | sideinputs.ViewAsList(label=label)) - _cache_view(pcoll.pipeline, cache_key, view) - return view - -def AsDict(pcoll, label=None): # pylint: disable=invalid-name - """Create a DictPCollectionView from the elements of input PCollection. + @staticmethod + def _from_runtime_iterable(it, options): + return dict(it) - The contents of the given PCollection whose elements are 2-tuples of key and - value will be available as a dict-like object in PTransforms that use the - returned PCollectionView as a side input. - Args: - pcoll: Input pcollection containing 2-tuples of key and value. - label: Label to be specified if several AsDict's for the same PCollection. - - Returns: - A dict PCollectionView containing the dict as above. - """ - label = label or _format_view_label(pcoll) - - # Don't recreate the view if it was already created. - cache_key = (pcoll, AsDict) - cached_view = _get_cached_view(pcoll.pipeline, cache_key) - if cached_view: - return cached_view - - # Local import is required due to dependency loop; even though the - # implementation of this function requires concepts defined in modules that - # depend on pvalue, it lives in this module to reduce user workload. - from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order, wrong-import-position - view = (pcoll | sideinputs.ViewAsDict(label=label)) - _cache_view(pcoll.pipeline, cache_key, view) - return view +# For backwards compatibility with worker code. +SingletonPCollectionView = AsSingleton +IterablePCollectionView = AsIter +ListPCollectionView = AsList +DictPCollectionView = AsDict class EmptySideInput(object): http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/pvalue_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pvalue_test.py b/sdks/python/apache_beam/pvalue_test.py index 86f1987..529ddf7 100644 --- a/sdks/python/apache_beam/pvalue_test.py +++ b/sdks/python/apache_beam/pvalue_test.py @@ -19,22 +19,8 @@ import unittest -from apache_beam.pipeline import Pipeline -from apache_beam.pvalue import AsDict -from apache_beam.pvalue import AsIter -from apache_beam.pvalue import AsList -from apache_beam.pvalue import AsSingleton from apache_beam.pvalue import PValue from apache_beam.test_pipeline import TestPipeline -from apache_beam.transforms import Create - - -class FakePipeline(Pipeline): - """Fake pipeline object used to check if apply() receives correct args.""" - - def apply(self, *args, **kwargs): - self.args = args - self.kwargs = kwargs class PValueTest(unittest.TestCase): @@ -44,25 +30,6 @@ class PValueTest(unittest.TestCase): value = PValue(pipeline) self.assertEqual(pipeline, value.pipeline) - def test_pcollectionview_not_recreated(self): - pipeline = TestPipeline() - value = pipeline | 'create1' >> Create([1, 2, 3]) - value2 = pipeline | 'create2' >> Create([(1, 1), (2, 2), (3, 3)]) - value3 = pipeline | 'create3' >> Create([(1, 1), (2, 2), (3, 3)]) - self.assertEqual(AsSingleton(value), AsSingleton(value)) - self.assertEqual(AsSingleton(value, default_value=1, label='new'), - AsSingleton(value, default_value=1, label='new')) - self.assertNotEqual(AsSingleton(value), - AsSingleton(value, default_value=1, label='new')) - self.assertEqual(AsIter(value), AsIter(value)) - self.assertEqual(AsList(value), AsList(value)) - self.assertEqual(AsDict(value2), AsDict(value2)) - - self.assertNotEqual(AsSingleton(value), AsSingleton(value2)) - self.assertNotEqual(AsIter(value), AsIter(value2)) - self.assertNotEqual(AsList(value), AsList(value2)) - self.assertNotEqual(AsDict(value2), AsDict(value3)) - if __name__ == '__main__': unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index a82671c..db433df 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -31,7 +31,7 @@ from apache_beam import coders from apache_beam import pvalue from apache_beam.internal import pickler from apache_beam.internal.gcp import json_value -from apache_beam.pvalue import PCollectionView +from apache_beam.pvalue import AsSideInput from apache_beam.runners.dataflow.dataflow_metrics import DataflowMetrics from apache_beam.runners.dataflow.internal import names from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api @@ -284,23 +284,26 @@ class DataflowRunner(PipelineRunner): PropertyNames.ENCODING: step.encoding, PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) - def run_CreatePCollectionView(self, transform_node): - step = self._add_step(TransformNames.COLLECTION_TO_SINGLETON, - transform_node.full_label, transform_node) - input_tag = transform_node.inputs[0].tag - input_step = self._cache.get_pvalue(transform_node.inputs[0]) + def _add_singleton_step(self, label, full_label, tag, input_step): + """Creates a CollectionToSingleton step used to handle ParDo side inputs.""" + # Import here to avoid adding the dependency for local running scenarios. + from google.cloud.dataflow.internal import apiclient + step = apiclient.Step(TransformNames.COLLECTION_TO_SINGLETON, label) + self.job.proto.steps.append(step.proto) + step.add_property(PropertyNames.USER_NAME, full_label) step.add_property( PropertyNames.PARALLEL_INPUT, {'@type': 'OutputReference', PropertyNames.STEP_NAME: input_step.proto.name, - PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)}) + PropertyNames.OUTPUT_NAME: input_step.get_output(tag)}) step.encoding = self._get_side_input_encoding(input_step.encoding) step.add_property( PropertyNames.OUTPUT_INFO, [{PropertyNames.USER_NAME: ( - '%s.%s' % (transform_node.full_label, PropertyNames.OUT)), + '%s.%s' % (full_label, PropertyNames.OUTPUT)), PropertyNames.ENCODING: step.encoding, - PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) + PropertyNames.OUTPUT_NAME: PropertyNames.OUTPUT}]) + return step def run_Flatten(self, transform_node): step = self._add_step(TransformNames.FLATTEN, @@ -375,8 +378,12 @@ class DataflowRunner(PipelineRunner): si_labels[side_pval] = self._cache.get_pvalue(side_pval).step_name lookup_label = lambda side_pval: si_labels[side_pval] for side_pval in transform_node.side_inputs: - assert isinstance(side_pval, PCollectionView) - si_label = lookup_label(side_pval) + assert isinstance(side_pval, AsSideInput) + si_label = self._get_unique_step_name() + si_full_label = '%s/%s' % (transform_node.full_label, si_label) + self._add_singleton_step( + si_label, si_full_label, side_pval.pvalue.tag, + self._cache.get_pvalue(side_pval.pvalue)) si_dict[si_label] = { '@type': 'OutputReference', PropertyNames.STEP_NAME: si_label, http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/runners/direct/bundle_factory.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/bundle_factory.py b/sdks/python/apache_beam/runners/direct/bundle_factory.py index 63319af..647b5f2 100644 --- a/sdks/python/apache_beam/runners/direct/bundle_factory.py +++ b/sdks/python/apache_beam/runners/direct/bundle_factory.py @@ -106,8 +106,7 @@ class Bundle(object): self._initial_windowed_value.windows) def __init__(self, pcollection, stacked=True): - assert (isinstance(pcollection, pvalue.PCollection) - or isinstance(pcollection, pvalue.PCollectionView)) + assert isinstance(pcollection, pvalue.PCollection) self._pcollection = pcollection self._elements = [] self._stacked = stacked http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py index 6f1757a..cdfadb7 100644 --- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py +++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py @@ -34,18 +34,13 @@ class ConsumerTrackingPipelineVisitor(PipelineVisitor): def __init__(self): self.value_to_consumers = {} # Map from PValue to [AppliedPTransform]. self.root_transforms = set() # set of (root) AppliedPTransforms. - self.views = [] # list of PCollectionViews. + self.views = [] # list of side inputs. self.step_names = {} # Map from AppliedPTransform to String. self._num_transforms = 0 - def visit_value(self, value, producer_node): - if value: - if isinstance(value, pvalue.PCollectionView): - self.views.append(value) - def visit_transform(self, applied_ptransform): - inputs = applied_ptransform.inputs + inputs = list(applied_ptransform.inputs) if inputs: for input_value in inputs: if isinstance(input_value, pvalue.PBegin): @@ -57,3 +52,5 @@ class ConsumerTrackingPipelineVisitor(PipelineVisitor): self.root_transforms.add(applied_ptransform) self.step_names[applied_ptransform] = 's%d' % (self._num_transforms) self._num_transforms += 1 + for side_input in applied_ptransform.side_inputs: + self.views.append(side_input) http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py index 73b897f..eb8b14b 100644 --- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py +++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py @@ -102,10 +102,10 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase): root_transforms = sorted( [t.transform for t in self.visitor.root_transforms]) self.assertEqual(root_transforms, sorted([root_create])) - self.assertEqual(len(self.visitor.step_names), 4) + self.assertEqual(len(self.visitor.step_names), 3) self.assertEqual(len(self.visitor.views), 1) self.assertTrue(isinstance(self.visitor.views[0], - pvalue.ListPCollectionView)) + pvalue.AsList)) def test_co_group_by_key(self): emails = self.pipeline | 'email' >> Create([('joe', 'j...@example.com')]) http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/runners/direct/evaluation_context.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py index 7a7f318..8114104 100644 --- a/sdks/python/apache_beam/runners/direct/evaluation_context.py +++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py @@ -50,12 +50,13 @@ class _SideInputView(object): def __init__(self, view): self._view = view self.callable_queue = collections.deque() + self.elements = [] self.value = None self.has_result = False class _SideInputsContainer(object): - """An in-process container for PCollectionViews. + """An in-process container for side inputs. It provides methods for blocking until a side-input is available and writing to a side input. @@ -67,21 +68,28 @@ class _SideInputsContainer(object): for view in views: self._views[view] = _SideInputView(view) - def get_value_or_schedule_after_output(self, pcollection_view, task): + def get_value_or_schedule_after_output(self, side_input, task): with self._lock: - view = self._views[pcollection_view] + view = self._views[side_input] if not view.has_result: view.callable_queue.append(task) task.blocked = True return (view.has_result, view.value) - def set_value_and_get_callables(self, pcollection_view, values): + def add_values(self, side_input, values): with self._lock: - view = self._views[pcollection_view] + view = self._views[side_input] + assert not view.has_result + view.elements.extend(values) + + def finalize_value_and_get_tasks(self, side_input): + with self._lock: + view = self._views[side_input] assert not view.has_result assert view.value is None assert view.callable_queue is not None - view.value = self._pvalue_to_value(pcollection_view, values) + view.value = self._pvalue_to_value(side_input, view.elements) + view.elements = None result = tuple(view.callable_queue) for task in result: task.blocked = False @@ -90,10 +98,10 @@ class _SideInputsContainer(object): return result def _pvalue_to_value(self, view, values): - """Given a PCollectionView, returns the associated value in requested form. + """Given a side input view, returns the associated value in requested form. Args: - view: PCollectionView for the requested side input. + view: SideInput for the requested side input. values: Iterable values associated with the side input. Returns: @@ -115,9 +123,9 @@ class EvaluationContext(object): EvaluationContext contains shared state for an execution of the DirectRunner that can be used while evaluating a PTransform. This consists of views into underlying state and watermark implementations, access - to read and write PCollectionViews, and constructing counter sets and + to read and write side inputs, and constructing counter sets and execution contexts. This includes executing callbacks asynchronously when - state changes to the appropriate point (e.g. when a PCollectionView is + state changes to the appropriate point (e.g. when a side input is requested and known to be empty). EvaluationContext also handles results by committing finalizing @@ -134,6 +142,9 @@ class EvaluationContext(object): self._value_to_consumers = value_to_consumers self._step_names = step_names self.views = views + self._pcollection_to_views = collections.defaultdict(list) + for view in views: + self._pcollection_to_views[view.pvalue].append(view) # AppliedPTransform -> Evaluator specific state objects self._application_state_interals = {} @@ -198,17 +209,20 @@ class EvaluationContext(object): # If the result is for a view, update side inputs container. if (result.output_bundles - and result.output_bundles[0].pcollection in self.views): - if committed_bundles: - assert len(committed_bundles) == 1 - # side_input must be materialized. - side_input_result = committed_bundles[0].get_elements_iterable( - make_copy=True) - else: - side_input_result = [] - tasks = self._side_inputs_container.set_value_and_get_callables( - result.output_bundles[0].pcollection, side_input_result) - self._pending_unblocked_tasks.extend(tasks) + and result.output_bundles[0].pcollection + in self._pcollection_to_views): + for view in self._pcollection_to_views[ + result.output_bundles[0].pcollection]: + for committed_bundle in committed_bundles: + # side_input must be materialized. + self._side_inputs_container.add_values( + view, + committed_bundle.get_elements_iterable(make_copy=True)) + if (self.get_execution_context(result.transform) + .watermarks.input_watermark + == WatermarkManager.WATERMARK_POS_INF): + self._pending_unblocked_tasks.extend( + self._side_inputs_container.finalize_value_and_get_tasks(view)) if result.counters: for counter in result.counters: @@ -277,7 +291,7 @@ class EvaluationContext(object): tw = self._watermark_manager.get_watermarks(transform) return tw.output_watermark == WatermarkManager.WATERMARK_POS_INF - def get_value_or_schedule_after_output(self, pcollection_view, task): + def get_value_or_schedule_after_output(self, side_input, task): assert isinstance(task, TransformExecutor) return self._side_inputs_container.get_value_or_schedule_after_output( - pcollection_view, task) + side_input, task) http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/runners/direct/executor.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index 27b6f2f..ce6356c 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -76,6 +76,7 @@ class ExecutorService(object): return None def run(self): + while not self.shutdown_requested: task = self._get_task_or_none() if task: @@ -249,9 +250,9 @@ class _TimerCompletionCallback(_CompletionCallback): class TransformExecutor(ExecutorService.CallableTask): """TransformExecutor will evaluate a bundle using an applied ptransform. - A CallableTask responsible for constructing a TransformEvaluator andevaluating - it on some bundle of input, and registering the result using the completion - callback. + A CallableTask responsible for constructing a TransformEvaluator and + evaluating it on some bundle of input, and registering the result using the + completion callback. """ def __init__(self, transform_evaluator_registry, evaluation_context, http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/runners/direct/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index f9a0692..0c35d99 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -31,7 +31,6 @@ from apache_beam.runners.direct.watermark_manager import WatermarkManager from apache_beam.runners.direct.transform_result import TransformResult from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite # pylint: disable=protected-access from apache_beam.transforms import core -from apache_beam.transforms import sideinputs from apache_beam.transforms.window import GlobalWindows from apache_beam.transforms.window import WindowedValue from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn @@ -54,7 +53,6 @@ class TransformEvaluatorRegistry(object): core.Flatten: _FlattenEvaluator, core.ParDo: _ParDoEvaluator, core.GroupByKeyOnly: _GroupByKeyOnlyEvaluator, - sideinputs.CreatePCollectionView: _CreatePCollectionViewEvaluator, _NativeWrite: _NativeWriteEvaluator, } @@ -100,8 +98,7 @@ class TransformEvaluatorRegistry(object): True if executor should execute applied_ptransform serially. """ return isinstance(applied_ptransform.transform, - (core.GroupByKeyOnly, sideinputs.CreatePCollectionView, - _NativeWrite)) + (core.GroupByKeyOnly, _NativeWrite)) class _TransformEvaluator(object): @@ -444,52 +441,6 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator): self._applied_ptransform, bundles, state, None, None, hold) -class _CreatePCollectionViewEvaluator(_TransformEvaluator): - """TransformEvaluator for CreatePCollectionView transform.""" - - def __init__(self, evaluation_context, applied_ptransform, - input_committed_bundle, side_inputs, scoped_metrics_container): - assert not side_inputs - super(_CreatePCollectionViewEvaluator, self).__init__( - evaluation_context, applied_ptransform, input_committed_bundle, - side_inputs, scoped_metrics_container) - - @property - def _is_final_bundle(self): - return (self._execution_context.watermarks.input_watermark - == WatermarkManager.WATERMARK_POS_INF) - - def start_bundle(self): - # state: [values] - self.state = (self._execution_context.existing_state - if self._execution_context.existing_state else []) - - assert len(self._outputs) == 1 - self.output_pcollection = list(self._outputs)[0] - - def process_element(self, element): - self.state.append(element) - - def finish_bundle(self): - if self._is_final_bundle: - bundle = self._evaluation_context.create_bundle(self.output_pcollection) - - view_result = self.state - for result in view_result: - bundle.output(result) - - bundles = [bundle] - state = None - hold = WatermarkManager.WATERMARK_POS_INF - else: - bundles = [] - state = self.state - hold = WatermarkManager.WATERMARK_NEG_INF - - return TransformResult( - self._applied_ptransform, bundles, state, None, None, hold) - - class _NativeWriteEvaluator(_TransformEvaluator): """TransformEvaluator for _NativeWrite transform.""" http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 7a52828..88fdec8 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -879,7 +879,7 @@ class CombineGlobally(PTransform): else CombineFn.from_callable(self.fn)) default_value = combine_fn.apply([], *self.args, **self.kwargs) else: - default_value = pvalue._SINGLETON_NO_DEFAULT # pylint: disable=protected-access + default_value = pvalue.AsSingleton._NO_DEFAULT # pylint: disable=protected-access view = pvalue.AsSingleton(combined, default_value=default_value) if self.as_view: return view http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/transforms/ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index aca5822..93d751d 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -463,7 +463,7 @@ class PTransformWithSideInputs(PTransform): 'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the ' 'PCollection is to be used.') self.args, self.kwargs, self.side_inputs = util.remove_objects_from_args( - args, kwargs, pvalue.PCollectionView) + args, kwargs, pvalue.AsSideInput) self.raw_side_inputs = args, kwargs # Prevent name collisions with fns of the form '<function <lambda> at ...>' @@ -519,7 +519,7 @@ class PTransformWithSideInputs(PTransform): args, kwargs = self.raw_side_inputs def element_type(side_input): - if isinstance(side_input, pvalue.PCollectionView): + if isinstance(side_input, pvalue.AsSideInput): return side_input.element_type else: return instance_to_type(side_input) http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/transforms/sideinputs.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py index 46731bf..1de7bac 100644 --- a/sdks/python/apache_beam/transforms/sideinputs.py +++ b/sdks/python/apache_beam/transforms/sideinputs.py @@ -24,140 +24,8 @@ AsSingleton, AsIter, AsList and AsDict in apache_beam.pvalue. from __future__ import absolute_import -from apache_beam import pvalue -from apache_beam import typehints -from apache_beam.transforms.ptransform import PTransform from apache_beam.transforms import window -# Type variables -K = typehints.TypeVariable('K') -V = typehints.TypeVariable('V') - - -class CreatePCollectionView(PTransform): - """Transform to materialize a given PCollectionView in the pipeline. - - Important: this transform is an implementation detail and should not be used - directly by pipeline writers. - """ - - def __init__(self, view): - self.view = view - super(CreatePCollectionView, self).__init__() - - def infer_output_type(self, input_type): - # TODO(ccy): Figure out if we want to create a new type of type hint, i.e., - # typehints.View[...]. - return input_type - - def expand(self, pcoll): - return self.view - - -class ViewAsSingleton(PTransform): - """Transform to view PCollection as a singleton PCollectionView. - - Important: this transform is an implementation detail and should not be used - directly by pipeline writers. Use pvalue.AsSingleton(...) instead. - """ - - def __init__(self, has_default, default_value, label=None): - if label: - label = 'ViewAsSingleton(%s)' % label - super(ViewAsSingleton, self).__init__(label=label) - self.has_default = has_default - self.default_value = default_value - - def expand(self, pcoll): - self._check_pcollection(pcoll) - input_type = pcoll.element_type - output_type = input_type - return (pcoll - | CreatePCollectionView( - pvalue.SingletonPCollectionView( - pcoll.pipeline, self.has_default, self.default_value, - default_window_mapping_fn(pcoll.windowing.windowfn))) - .with_input_types(input_type) - .with_output_types(output_type)) - - -class ViewAsIterable(PTransform): - """Transform to view PCollection as an iterable PCollectionView. - - Important: this transform is an implementation detail and should not be used - directly by pipeline writers. Use pvalue.AsIter(...) instead. - """ - - def __init__(self, label=None): - if label: - label = 'ViewAsIterable(%s)' % label - super(ViewAsIterable, self).__init__(label=label) - - def expand(self, pcoll): - self._check_pcollection(pcoll) - input_type = pcoll.element_type - output_type = typehints.Iterable[input_type] - return (pcoll - | CreatePCollectionView( - pvalue.IterablePCollectionView( - pcoll.pipeline, - default_window_mapping_fn(pcoll.windowing.windowfn))) - .with_input_types(input_type) - .with_output_types(output_type)) - - -class ViewAsList(PTransform): - """Transform to view PCollection as a list PCollectionView. - - Important: this transform is an implementation detail and should not be used - directly by pipeline writers. Use pvalue.AsList(...) instead. - """ - - def __init__(self, label=None): - if label: - label = 'ViewAsList(%s)' % label - super(ViewAsList, self).__init__(label=label) - - def expand(self, pcoll): - self._check_pcollection(pcoll) - input_type = pcoll.element_type - output_type = typehints.List[input_type] - return (pcoll - | CreatePCollectionView(pvalue.ListPCollectionView( - pcoll.pipeline, - default_window_mapping_fn(pcoll.windowing.windowfn))) - .with_input_types(input_type) - .with_output_types(output_type)) - - -@typehints.with_input_types(typehints.Tuple[K, V]) -@typehints.with_output_types(typehints.Dict[K, V]) -class ViewAsDict(PTransform): - """Transform to view PCollection as a dict PCollectionView. - - Important: this transform is an implementation detail and should not be used - directly by pipeline writers. Use pvalue.AsDict(...) instead. - """ - - def __init__(self, label=None): - if label: - label = 'ViewAsDict(%s)' % label - super(ViewAsDict, self).__init__(label=label) - - def expand(self, pcoll): - self._check_pcollection(pcoll) - input_type = pcoll.element_type - key_type, value_type = ( - typehints.trivial_inference.key_value_types(input_type)) - output_type = typehints.Dict[key_type, value_type] - return (pcoll - | CreatePCollectionView( - pvalue.DictPCollectionView( - pcoll.pipeline, - default_window_mapping_fn(pcoll.windowing.windowfn))) - .with_input_types(input_type) - .with_output_types(output_type)) - # Top-level function so we can identify it later. def _global_window_mapping_fn(w, global_window=window.GlobalWindow()): http://git-wip-us.apache.org/repos/asf/beam/blob/207de81b/sdks/python/apache_beam/transforms/sideinputs_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index 9278f4b..53669de 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -186,8 +186,8 @@ class SideInputsTest(unittest.TestCase): main_input = pipeline | 'main input' >> beam.Create([1]) side_list = pipeline | 'side list' >> beam.Create(a_list) side_pairs = pipeline | 'side pairs' >> beam.Create(some_pairs) - results = main_input | 'concatenate' >> beam.FlatMap( - lambda x, the_list, the_dict: [[x, the_list, the_dict]], + results = main_input | 'concatenate' >> beam.Map( + lambda x, the_list, the_dict: [x, the_list, the_dict], beam.pvalue.AsList(side_list), beam.pvalue.AsDict(side_pairs)) def matcher(expected_elem, expected_list, expected_pairs): @@ -205,13 +205,13 @@ class SideInputsTest(unittest.TestCase): def test_as_singleton_without_unique_labels(self): # This should succeed as calling beam.pvalue.AsSingleton on the same # PCollection twice with the same defaults will return the same - # PCollectionView. + # view. a_list = [2] pipeline = self.create_pipeline() main_input = pipeline | 'main input' >> beam.Create([1]) side_list = pipeline | 'side list' >> beam.Create(a_list) - results = main_input | beam.FlatMap( - lambda x, s1, s2: [[x, s1, s2]], + results = main_input | beam.Map( + lambda x, s1, s2: [x, s1, s2], beam.pvalue.AsSingleton(side_list), beam.pvalue.AsSingleton(side_list)) def matcher(expected_elem, expected_singleton): @@ -226,34 +226,15 @@ class SideInputsTest(unittest.TestCase): pipeline.run() @attr('ValidatesRunner') - def test_as_singleton_with_different_defaults_without_unique_labels(self): - # This should fail as beam.pvalue.AsSingleton with distinct default values - # should beam.Create distinct PCollectionViews with the same full_label. - a_list = [2] - pipeline = self.create_pipeline() - main_input = pipeline | 'main input' >> beam.Create([1]) - side_list = pipeline | 'side list' >> beam.Create(a_list) - - with self.assertRaises(RuntimeError) as e: - _ = main_input | beam.FlatMap( - lambda x, s1, s2: [[x, s1, s2]], - beam.pvalue.AsSingleton(side_list), - beam.pvalue.AsSingleton(side_list, default_value=3)) - self.assertTrue( - e.exception.message.startswith( - 'Transform "ViewAsSingleton(side list.None)" does not have a ' - 'stable unique label.')) - - @attr('ValidatesRunner') - def test_as_singleton_with_different_defaults_with_unique_labels(self): + def test_as_singleton_with_different_defaults(self): a_list = [] pipeline = self.create_pipeline() main_input = pipeline | 'main input' >> beam.Create([1]) side_list = pipeline | 'side list' >> beam.Create(a_list) - results = main_input | beam.FlatMap( - lambda x, s1, s2: [[x, s1, s2]], - beam.pvalue.AsSingleton(side_list, default_value=2, label='si1'), - beam.pvalue.AsSingleton(side_list, default_value=3, label='si2')) + results = main_input | beam.Map( + lambda x, s1, s2: [x, s1, s2], + beam.pvalue.AsSingleton(side_list, default_value=2), + beam.pvalue.AsSingleton(side_list, default_value=3)) def matcher(expected_elem, expected_singleton1, expected_singleton2): def match(actual): @@ -267,15 +248,15 @@ class SideInputsTest(unittest.TestCase): pipeline.run() @attr('ValidatesRunner') - def test_as_list_without_unique_labels(self): + def test_as_list_twice(self): # This should succeed as calling beam.pvalue.AsList on the same - # PCollection twice will return the same PCollectionView. + # PCollection twice will return the same view. a_list = [1, 2, 3] pipeline = self.create_pipeline() main_input = pipeline | 'main input' >> beam.Create([1]) side_list = pipeline | 'side list' >> beam.Create(a_list) - results = main_input | beam.FlatMap( - lambda x, ls1, ls2: [[x, ls1, ls2]], + results = main_input | beam.Map( + lambda x, ls1, ls2: [x, ls1, ls2], beam.pvalue.AsList(side_list), beam.pvalue.AsList(side_list)) def matcher(expected_elem, expected_list): @@ -290,37 +271,15 @@ class SideInputsTest(unittest.TestCase): pipeline.run() @attr('ValidatesRunner') - def test_as_list_with_unique_labels(self): - a_list = [1, 2, 3] - pipeline = self.create_pipeline() - main_input = pipeline | 'main input' >> beam.Create([1]) - side_list = pipeline | 'side list' >> beam.Create(a_list) - results = main_input | beam.FlatMap( - lambda x, ls1, ls2: [[x, ls1, ls2]], - beam.pvalue.AsList(side_list), - beam.pvalue.AsList(side_list, label='label')) - - def matcher(expected_elem, expected_list): - def match(actual): - [[actual_elem, actual_list1, actual_list2]] = actual - equal_to([expected_elem])([actual_elem]) - equal_to(expected_list)(actual_list1) - equal_to(expected_list)(actual_list2) - return match - - assert_that(results, matcher(1, [1, 2, 3])) - pipeline.run() - - @attr('ValidatesRunner') - def test_as_dict_with_unique_labels(self): + def test_as_dict_twice(self): some_kvs = [('a', 1), ('b', 2)] pipeline = self.create_pipeline() main_input = pipeline | 'main input' >> beam.Create([1]) side_kvs = pipeline | 'side kvs' >> beam.Create(some_kvs) - results = main_input | beam.FlatMap( - lambda x, dct1, dct2: [[x, dct1, dct2]], + results = main_input | beam.Map( + lambda x, dct1, dct2: [x, dct1, dct2], beam.pvalue.AsDict(side_kvs), - beam.pvalue.AsDict(side_kvs, label='label')) + beam.pvalue.AsDict(side_kvs)) def matcher(expected_elem, expected_kvs): def match(actual): @@ -333,6 +292,20 @@ class SideInputsTest(unittest.TestCase): assert_that(results, matcher(1, some_kvs)) pipeline.run() + @attr('ValidatesRunner') + def test_flattened_side_input(self): + pipeline = self.create_pipeline() + main_input = pipeline | 'main input' >> beam.Create([None]) + side_input = ( + pipeline | 'side1' >> beam.Create(['a']), + pipeline | 'side2' >> beam.Create(['b'])) | beam.Flatten() + results = main_input | beam.FlatMap( + lambda _, ab: ab, + beam.pvalue.AsList(side_input)) + + assert_that(results, equal_to(['a', 'b'])) + pipeline.run() + if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG)