Repository: beam Updated Branches: refs/heads/master e93c06485 -> 39074899a
Enable grpc controller in fn_api_runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8dd0077d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8dd0077d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8dd0077d Branch: refs/heads/master Commit: 8dd0077d2a58e278b11c7e7eb4b5f182e1400992 Parents: e93c064 Author: Vikas Kedigehalli <vika...@google.com> Authored: Mon Jun 26 18:47:39 2017 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Tue Jun 27 10:17:49 2017 -0700 ---------------------------------------------------------------------- .../runners/portability/fn_api_runner.py | 12 +++++++--- .../runners/portability/fn_api_runner_test.py | 23 +++++++++++++++++++- .../apache_beam/runners/worker/sdk_worker.py | 2 +- 3 files changed, 32 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8dd0077d/sdks/python/apache_beam/runners/portability/fn_api_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index a8e2eb4..c5438ad 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -174,12 +174,17 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): return {tag: pcollection_id(op_ix, out_ix) for out_ix, tag in enumerate(getattr(op, 'output_tags', ['out']))} + def only_element(iterable): + element, = iterable + return element + for op_ix, (stage_name, operation) in enumerate(map_task): transform_id = uniquify(stage_name) if isinstance(operation, operation_specs.WorkerInMemoryWrite): # Write this data back to the runner. - runner_sinks[(transform_id, 'out')] = operation + target_name = only_element(get_inputs(operation).keys()) + runner_sinks[(transform_id, target_name)] = operation transform_spec = beam_runner_api_pb2.FunctionSpec( urn=sdk_worker.DATA_OUTPUT_URN, parameter=proto_utils.pack_Any(data_operation_spec)) @@ -190,7 +195,8 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): maptask_executor_runner.InMemorySource) and isinstance(operation.source.source.default_output_coder(), WindowedValueCoder)): - input_data[(transform_id, 'input')] = self._reencode_elements( + target_name = only_element(get_outputs(op_ix).keys()) + input_data[(transform_id, target_name)] = self._reencode_elements( operation.source.source.read(None), operation.source.source.default_output_coder()) transform_spec = beam_runner_api_pb2.FunctionSpec( @@ -309,7 +315,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): sink_op.output_buffer.append(e) return - def execute_map_tasks(self, ordered_map_tasks, direct=True): + def execute_map_tasks(self, ordered_map_tasks, direct=False): if direct: controller = FnApiRunner.DirectController() else: http://git-wip-us.apache.org/repos/asf/beam/blob/8dd0077d/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py index 9159035..163e980 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py @@ -21,6 +21,8 @@ import unittest import apache_beam as beam from apache_beam.runners.portability import fn_api_runner from apache_beam.runners.portability import maptask_executor_runner_test +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to class FnApiRunnerTest( @@ -31,9 +33,28 @@ class FnApiRunnerTest( runner=fn_api_runner.FnApiRunner()) def test_combine_per_key(self): - # TODO(robertwb): Implement PGBKCV operation. + # TODO(BEAM-1348): Enable once Partial GBK is supported in fn API. pass + def test_combine_per_key(self): + # TODO(BEAM-1348): Enable once Partial GBK is supported in fn API. + pass + + def test_pardo_side_inputs(self): + # TODO(BEAM-1348): Enable once side inputs are supported in fn API. + pass + + def test_pardo_unfusable_side_inputs(self): + # TODO(BEAM-1348): Enable once side inputs are supported in fn API. + pass + + def test_assert_that(self): + # TODO: figure out a way for fn_api_runner to parse and raise the + # underlying exception. + with self.assertRaisesRegexp(RuntimeError, 'BeamAssertException'): + with self.create_pipeline() as p: + assert_that(p | beam.Create(['a', 'b']), equal_to(['a'])) + # Inherits all tests from maptask_executor_runner.MapTaskExecutorRunner http://git-wip-us.apache.org/repos/asf/beam/blob/8dd0077d/sdks/python/apache_beam/runners/worker/sdk_worker.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 6a366eb..e1ddfb7 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -415,7 +415,7 @@ def create(factory, transform_id, transform_proto, grpc_port, consumers): def create(factory, transform_id, transform_proto, grpc_port, consumers): target = beam_fn_api_pb2.Target( primitive_transform_reference=transform_id, - name='out') + name=only_element(transform_proto.inputs.keys())) return DataOutputOperation( transform_proto.unique_name, transform_proto.unique_name,