DirectPipelineRunner bug fixes. - Execute empty [] | pipelines to the end. - use pickler to serialize/deserialize DoFns instead of deepcopy similar to the othe execution environments.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6f93cd58 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6f93cd58 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6f93cd58 Branch: refs/heads/python-sdk Commit: 6f93cd5884797c0880766c7737e106765becf96d Parents: 778194f Author: Ahmet Altay <al...@google.com> Authored: Thu Nov 10 17:37:45 2016 -0800 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Fri Nov 11 00:23:45 2016 -0800 ---------------------------------------------------------------------- .../examples/snippets/snippets_test.py | 14 ++++++--- sdks/python/apache_beam/pipeline_test.py | 5 +++ .../apache_beam/runners/direct/direct_runner.py | 2 +- .../apache_beam/runners/direct/executor.py | 33 ++++++++++++-------- .../runners/direct/transform_evaluator.py | 8 +++-- 5 files changed, 41 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index edc0a17..72fccb2 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -29,6 +29,8 @@ from apache_beam import io from apache_beam import pvalue from apache_beam import typehints from apache_beam.io import fileio +from apache_beam.transforms.util import assert_that +from apache_beam.transforms.util import equal_to from apache_beam.utils.options import TypeOptions from apache_beam.examples.snippets import snippets @@ -307,7 +309,9 @@ class TypeHintsTest(unittest.TestCase): # [END type_hints_runtime_on] def test_deterministic_key(self): - lines = ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3'] + p = beam.Pipeline('DirectPipelineRunner') + lines = (p | beam.Create( + ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3'])) # [START type_hints_deterministic_key] class Player(object): @@ -338,9 +342,11 @@ class TypeHintsTest(unittest.TestCase): beam.typehints.Tuple[Player, int])) # [END type_hints_deterministic_key] - self.assertEquals( - {('banana', 3), ('kiwi', 4), ('zucchini', 3)}, - set(totals | beam.Map(lambda (k, v): (k.name, v)))) + assert_that( + totals | beam.Map(lambda (k, v): (k.name, v)), + equal_to([('banana', 3), ('kiwi', 4), ('zucchini', 3)])) + + p.run() class SnippetsTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index a4c983f..013796c 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -24,6 +24,7 @@ from apache_beam.pipeline import Pipeline from apache_beam.pipeline import PipelineOptions from apache_beam.pipeline import PipelineVisitor from apache_beam.runners.dataflow.native_io.iobase import NativeSource +from apache_beam.transforms import CombineGlobally from apache_beam.transforms import Create from apache_beam.transforms import FlatMap from apache_beam.transforms import Map @@ -217,6 +218,10 @@ class PipelineTest(unittest.TestCase): pipeline.run() + def test_aggregator_empty_input(self): + actual = [] | CombineGlobally(max).without_defaults() + self.assertEqual(actual, []) + def test_pipeline_as_context(self): def raise_exception(exn): raise exn http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/runners/direct/direct_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 2e5fe74..1afd486 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -125,7 +125,7 @@ class BufferingInMemoryCache(object): for key, value in self._cache.iteritems(): applied_ptransform, tag = key self._pvalue_cache.cache_output(applied_ptransform, tag, value) - self._cache = None + self._cache = None class DirectPipelineResult(PipelineResult): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/runners/direct/executor.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index 0f1c53b..378aecf 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -202,7 +202,7 @@ class TransformExecutorServices(object): if not cached: cached = SerialEvaluationState(self._executor_service, self._scheduled) self._serial_cache[step] = cached - return cached + return cached @property def executors(self): @@ -480,18 +480,20 @@ class _ExecutorServiceParallelExecutor(object): Otherwise monitor task should schedule itself again for future execution. """ - if self._executor.evaluation_context.is_done(): - self._executor.visible_updates.offer( - _ExecutorServiceParallelExecutor.VisibleExecutorUpdate()) - self._executor.executor_service.shutdown() - return True - elif not self._is_executing: - self._executor.visible_updates.offer( - _ExecutorServiceParallelExecutor.VisibleExecutorUpdate( - Exception('Monitor task detected a pipeline stall.'))) + if self._is_executing(): + # There are some bundles still in progress. + return False + else: + if self._executor.evaluation_context.is_done(): + self._executor.visible_updates.offer( + _ExecutorServiceParallelExecutor.VisibleExecutorUpdate()) + else: + # Nothing is scheduled for execution, but watermarks incomplete. + self._executor.visible_updates.offer( + _ExecutorServiceParallelExecutor.VisibleExecutorUpdate( + Exception('Monitor task detected a pipeline stall.'))) self._executor.executor_service.shutdown() return True - return False def _fire_timers(self): """Schedules triggered consumers if any timers fired. @@ -515,8 +517,13 @@ class _ExecutorServiceParallelExecutor(object): def _is_executing(self): """Returns True if there is at least one non-blocked TransformExecutor.""" - for transform_executor in ( - self._executor.transform_executor_services.executors): + executors = self._executor.transform_executor_services.executors + if not executors: + # Nothing is executing. + return False + + # Ensure that at least one of those executors is not blocked. + for transform_executor in executors: if not transform_executor.blocked: return True return False http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/runners/direct/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index c732d7f..093f183 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -20,10 +20,10 @@ from __future__ import absolute_import import collections -import copy from apache_beam import coders from apache_beam import pvalue +from apache_beam.internal import pickler import apache_beam.io as io from apache_beam.runners.common import DoFnRunner from apache_beam.runners.common import DoFnState @@ -338,7 +338,8 @@ class _ParDoEvaluator(_TransformEvaluator): self._counter_factory = counters.CounterFactory() - dofn = copy.deepcopy(transform.dofn) + # TODO(aaltay): Consider storing the serialized form as an optimization. + dofn = pickler.loads(pickler.dumps(transform.dofn)) pipeline_options = self._evaluation_context.pipeline_options if (pipeline_options is not None @@ -504,7 +505,8 @@ class _NativeWriteEvaluator(_TransformEvaluator): side_inputs) assert applied_ptransform.transform.sink - self._sink = copy.deepcopy(applied_ptransform.transform.sink) + # TODO(aaltay): Consider storing the serialized form as an optimization. + self._sink = pickler.loads(pickler.dumps(applied_ptransform.transform.sink)) @property def _is_final_bundle(self):