Wrap unknown coders in LengthPrefixCoder.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/08a44874 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/08a44874 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/08a44874 Branch: refs/heads/master Commit: 08a448743e3b53e055d0ccf1983b5d128c5c0692 Parents: e6d5e08 Author: Robert Bradshaw <rober...@gmail.com> Authored: Thu Aug 24 11:01:20 2017 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Mon Aug 28 10:03:52 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/coders/coders.py | 10 ++ .../runners/portability/fn_api_runner.py | 99 ++++++++++++++++++-- 2 files changed, 100 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/08a44874/sdks/python/apache_beam/coders/coders.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index e204369..10fb07b 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -707,6 +707,16 @@ class TupleCoder(FastCoder): def __hash__(self): return hash(self._coders) + def to_runner_api_parameter(self, context): + if self.is_kv_coder(): + return urns.KV_CODER, None, self.coders() + else: + return super(TupleCoder, self).to_runner_api_parameter(context) + + @Coder.register_urn(urns.KV_CODER, None) + def from_runner_api_parameter(unused_payload, components, unused_context): + return TupleCoder(components) + class TupleSequenceCoder(FastCoder): """Coder of homogeneous tuple objects.""" http://git-wip-us.apache.org/repos/asf/beam/blob/08a44874/sdks/python/apache_beam/runners/portability/fn_api_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 7c0c06f..c9b3d9a 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -122,7 +122,7 @@ OLDE_SOURCE_SPLITTABLE_DOFN_DATA = pickler.dumps( class _GroupingBuffer(object): """Used to accumulate groupded (shuffled) results.""" def __init__(self, pre_grouped_coder, post_grouped_coder): - self._key_coder = pre_grouped_coder.value_coder().key_coder() + self._key_coder = pre_grouped_coder.key_coder() self._pre_grouped_coder = pre_grouped_coder self._post_grouped_coder = post_grouped_coder self._table = collections.defaultdict(list) @@ -249,13 +249,80 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): # Now define the "optimization" phases. + safe_coders = {} + def expand_gbk(stages): """Transforms each GBK into a write followed by a read. """ + good_coder_urns = set(beam.coders.Coder._known_urns.keys()) - set([ + urns.PICKLED_CODER]) + coders = pipeline_components.coders + + for coder_id, coder_proto in coders.items(): + if coder_proto.spec.spec.urn == urns.BYTES_CODER: + bytes_coder_id = coder_id + break + else: + bytes_coder_id = unique_name(coders, 'bytes_coder') + pipeline_components.coders[bytes_coder_id].CopyFrom( + beam.coders.BytesCoder().to_runner_api(None)) + + coder_substitutions = {} + + def wrap_unknown_coders(coder_id, with_bytes): + if (coder_id, with_bytes) not in coder_substitutions: + wrapped_coder_id = None + coder_proto = coders[coder_id] + if coder_proto.spec.spec.urn == urns.LENGTH_PREFIX_CODER: + coder_substitutions[coder_id, with_bytes] = ( + bytes_coder_id if with_bytes else coder_id) + elif coder_proto.spec.spec.urn in good_coder_urns: + wrapped_components = [wrap_unknown_coders(c, with_bytes) + for c in coder_proto.component_coder_ids] + if wrapped_components == list(coder_proto.component_coder_ids): + # Use as is. + coder_substitutions[coder_id, with_bytes] = coder_id + else: + wrapped_coder_id = unique_name( + coders, + coder_id + ("_bytes" if with_bytes else "_len_prefix")) + coders[wrapped_coder_id].CopyFrom(coder_proto) + coders[wrapped_coder_id].component_coder_ids[:] = [ + wrap_unknown_coders(c, with_bytes) + for c in coder_proto.component_coder_ids] + coder_substitutions[coder_id, with_bytes] = wrapped_coder_id + else: + # Not a known coder. + if with_bytes: + coder_substitutions[coder_id, with_bytes] = bytes_coder_id + else: + wrapped_coder_id = unique_name(coders, coder_id + "_len_prefix") + len_prefix_coder_proto = beam_runner_api_pb2.Coder( + spec=beam_runner_api_pb2.SdkFunctionSpec( + spec=beam_runner_api_pb2.FunctionSpec( + urn=urns.LENGTH_PREFIX_CODER)), + component_coder_ids=[coder_id]) + coders[wrapped_coder_id].CopyFrom(len_prefix_coder_proto) + coder_substitutions[coder_id, with_bytes] = wrapped_coder_id + # This operation is idempotent. + if wrapped_coder_id: + coder_substitutions[wrapped_coder_id, with_bytes] = wrapped_coder_id + return coder_substitutions[coder_id, with_bytes] + + def fix_pcoll_coder(pcoll): + new_coder_id = wrap_unknown_coders(pcoll.coder_id, False) + safe_coders[new_coder_id] = wrap_unknown_coders(pcoll.coder_id, True) + pcoll.coder_id = new_coder_id + for stage in stages: assert len(stage.transforms) == 1 transform = stage.transforms[0] if transform.spec.urn == urns.GROUP_BY_KEY_ONLY_TRANSFORM: + for pcoll_id in transform.inputs.values(): + fix_pcoll_coder(pipeline_components.pcollections[pcoll_id]) + for pcoll_id in transform.outputs.values(): + fix_pcoll_coder(pipeline_components.pcollections[pcoll_id]) + # This is used later to correlate the read and write. param = str("group:%s" % stage.name) gbk_write = Stage( @@ -547,9 +614,9 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): logging.debug('Stages: %s', [str(s) for s in stages]) # Return the (possibly mutated) context and ordered set of stages. - return pipeline_components, stages + return pipeline_components, stages, safe_coders - def run_stages(self, pipeline_components, stages, direct=True): + def run_stages(self, pipeline_components, stages, safe_coders, direct=True): if direct: controller = FnApiRunner.DirectController() @@ -559,13 +626,15 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): try: pcoll_buffers = collections.defaultdict(list) for stage in stages: - self.run_stage(controller, pipeline_components, stage, pcoll_buffers) + self.run_stage( + controller, pipeline_components, stage, pcoll_buffers, safe_coders) finally: controller.close() return maptask_executor_runner.WorkerRunnerResult(PipelineState.DONE) - def run_stage(self, controller, pipeline_components, stage, pcoll_buffers): + def run_stage( + self, controller, pipeline_components, stage, pcoll_buffers, safe_coders): coders = pipeline_context.PipelineContext(pipeline_components).coders data_operation_spec = controller.data_operation_spec() @@ -666,10 +735,10 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): original_gbk_transform] input_pcoll = only_element(transform_proto.inputs.values()) output_pcoll = only_element(transform_proto.outputs.values()) - pre_gbk_coder = coders[ - pipeline_components.pcollections[input_pcoll].coder_id] - post_gbk_coder = coders[ - pipeline_components.pcollections[output_pcoll].coder_id] + pre_gbk_coder = coders[safe_coders[ + pipeline_components.pcollections[input_pcoll].coder_id]] + post_gbk_coder = coders[safe_coders[ + pipeline_components.pcollections[output_pcoll].coder_id]] pcoll_buffers[pcoll_id] = _GroupingBuffer( pre_gbk_coder, post_gbk_coder) pcoll_buffers[pcoll_id].append(output.data) @@ -1000,3 +1069,15 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): def only_element(iterable): element, = iterable return element + + +def unique_name(existing, prefix): + if prefix in existing: + counter = 0 + while True: + counter += 1 + prefix_counter = prefix + "_%s" % counter + if prefix_counter not in existing: + return prefix_counter + else: + return prefix