Runner API context helper classes.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bc76a186 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bc76a186 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bc76a186 Branch: refs/heads/gearpump-runner Commit: bc76a186099568ef292ceb007388ae7174150bc2 Parents: 3bb125e Author: Robert Bradshaw <rober...@gmail.com> Authored: Tue Mar 7 12:04:27 2017 -0800 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Thu Mar 9 20:29:00 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/pipeline.py | 62 ++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/bc76a186/sdks/python/apache_beam/pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 7db39a9..4ec2e47 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -52,11 +52,14 @@ import os import shutil import tempfile +from apache_beam import coders from apache_beam import pvalue from apache_beam import typehints from apache_beam.internal import pickler from apache_beam.runners import create_runner from apache_beam.runners import PipelineRunner +from apache_beam.runners.api import beam_runner_api_pb2 +from apache_beam.transforms import core from apache_beam.transforms import ptransform from apache_beam.typehints import TypeCheckError from apache_beam.utils.pipeline_options import PipelineOptions @@ -440,3 +443,62 @@ class AppliedPTransform(object): if v not in visited: visited.add(v) visitor.visit_value(v, self) + + +class PipelineContextMap(object): + """This is a bi-directional map between objects and ids. + + Under the hood it encodes and decodes these objects into runner API + representations. + """ + def __init__(self, context, obj_type, proto_map=None): + self._pipeline_context = context + self._obj_type = obj_type + self._obj_to_id = {} + self._id_to_obj = {} + self._id_to_proto = proto_map if proto_map else {} + self._counter = 0 + + def _unique_ref(self): + self._counter += 1 + return "ref_%s_%s" % (self._obj_type.__name__, self._counter) + + def populate_map(self, proto_map): + for id, obj in self._id_to_obj: + proto_map[id] = self._id_to_proto[id] + + def get_id(self, obj): + if obj not in self._obj_to_id: + id = self._unique_ref() + self._id_to_obj[id] = obj + self._obj_to_id[obj] = id + self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context) + return self._obj_to_id[obj] + + def get_by_id(self, id): + if id not in self._id_to_obj: + self._id_to_obj[id] = self._obj_type.from_runner_api( + self._id_to_proto[id], self._pipeline_context) + return self._id_to_obj[id] + + +class PipelineContext(object): + + _COMPONENT_TYPES = { + 'transforms': AppliedPTransform, + 'pcollections': pvalue.PCollection, + 'coders': coders.Coder, + 'windowing_strategies': core.Windowing, + # TODO: environment + } + + def __init__(self, context_proto=None): + for name, cls in self._COMPONENT_TYPES.items(): + setattr(self, name, + PipelineContextMap(self, cls, getattr(context_proto, name, None))) + + def to_runner_api(self): + context_proto = beam_runner_api_pb2.Components() + for name, cls in self._COMPONENT_TYEPS: + getattr(self, name).populate_map(getattr(context_proto, name)) + return context_proto