Repository: beam Updated Branches: refs/heads/master b80aac5e3 -> e9cd41165
Implement combiner lifting for direct runner. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0de5cf87 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0de5cf87 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0de5cf87 Branch: refs/heads/master Commit: 0de5cf875aaef9e987561371a0fa56c875ce45c1 Parents: b80aac5 Author: Robert Bradshaw <rober...@gmail.com> Authored: Tue Jan 31 11:41:09 2017 -0800 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Tue Jan 31 15:35:21 2017 -0800 ---------------------------------------------------------------------- .../apache_beam/runners/direct/direct_runner.py | 15 +++- .../runners/direct/helper_transforms.py | 77 ++++++++++++++++++++ 2 files changed, 88 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0de5cf87/sdks/python/apache_beam/runners/direct/direct_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index dc2668d..28dc012 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -40,6 +40,17 @@ class DirectRunner(PipelineRunner): def __init__(self): self._cache = None + def apply_CombinePerKey(self, transform, pcoll): + # TODO: Move imports to top. Pipeline <-> Runner dependency cause problems + # with resolving imports when they are at top. + # pylint: disable=wrong-import-position + from apache_beam.runners.direct.helper_transforms import LiftedCombinePerKey + try: + return pcoll | LiftedCombinePerKey( + transform.fn, transform.args, transform.kwargs) + except NotImplementedError: + return transform.expand(pcoll) + def run(self, pipeline): """Execute the entire pipeline and returns an DirectPipelineResult.""" @@ -90,10 +101,6 @@ class DirectRunner(PipelineRunner): self._cache = BufferingInMemoryCache() return self._cache.pvalue_cache - def apply(self, transform, input): # pylint: disable=redefined-builtin - """Runner callback for a pipeline.apply call.""" - return transform.expand(input) - class BufferingInMemoryCache(object): """PValueCache wrapper for buffering bundles until a PValue is fully computed. http://git-wip-us.apache.org/repos/asf/beam/blob/0de5cf87/sdks/python/apache_beam/runners/direct/helper_transforms.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/helper_transforms.py b/sdks/python/apache_beam/runners/direct/helper_transforms.py new file mode 100644 index 0000000..340db75 --- /dev/null +++ b/sdks/python/apache_beam/runners/direct/helper_transforms.py @@ -0,0 +1,77 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import collections +import itertools + +import apache_beam as beam +from apache_beam.utils.windowed_value import WindowedValue +from apache_beam.internal.util import ArgumentPlaceholder + + +class LiftedCombinePerKey(beam.PTransform): + """An implementation of CombinePerKey that does mapper-side pre-combining. + """ + def __init__(self, combine_fn, args, kwargs): + if any(isinstance(arg, ArgumentPlaceholder) + for arg in itertools.chain(args, kwargs.values())): + # This isn't implemented in dataflow either... + raise NotImplementedError('Deferred CombineFn side inputs.') + self._combine_fn = beam.transforms.combiners.curry_combine_fn( + combine_fn, args, kwargs) + + def expand(self, pcoll): + return (pcoll + | beam.ParDo(PartialGroupByKeyCombiningValues(self._combine_fn)) + | beam.GroupByKey() + | beam.ParDo(FinishCombine(self._combine_fn))) + + +class PartialGroupByKeyCombiningValues(beam.DoFn): + """Aggregates values into a per-key-window cache. + + As bundles are in-memory-sized, we don't bother flushing until the very end. + """ + def __init__(self, combine_fn): + self._combine_fn = combine_fn + + def start_bundle(self, context): + self._cache = collections.defaultdict(self._combine_fn.create_accumulator) + + def process(self, context): + k, vi = context.element + for w in context.windows: + self._cache[k, w] = self._combine_fn.add_input(self._cache[k, w], vi) + + def finish_bundle(self, context): + import pprint + pprint.pprint(dict(self._cache)) + for (k, w), va in self._cache.items(): + yield WindowedValue((k, va), w.end, (w,)) + + +class FinishCombine(beam.DoFn): + """Merges partially combined results. + """ + def __init__(self, combine_fn): + self._combine_fn = combine_fn + + def process(self, context): + k, vs = context.element + return [( + k, + self._combine_fn.extract_output(self._combine_fn.merge_accumulators(vs)))]