Actually test the fn_api_runner. The test suite was not being run due to a typo. Fix breakage due to changes in the code in the meantime.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e71eb66a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e71eb66a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e71eb66a Branch: refs/heads/master Commit: e71eb66ae319bdf0cdad1fe9b54662962c8e8f16 Parents: 7126fdc Author: Robert Bradshaw <rober...@google.com> Authored: Fri Jun 9 16:44:55 2017 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Tue Jun 13 08:56:36 2017 -0700 ---------------------------------------------------------------------- .../runners/portability/fn_api_runner.py | 16 ++++++---------- .../runners/portability/fn_api_runner_test.py | 4 ++-- .../python/apache_beam/runners/worker/operations.py | 1 + .../python/apache_beam/runners/worker/sdk_worker.py | 2 +- 4 files changed, 10 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e71eb66a/sdks/python/apache_beam/runners/portability/fn_api_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index a83eae4..8c213ad 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -179,7 +179,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): # into the sdk worker, or an injection of the source object into the # sdk worker as data followed by an SDF that reads that source. if (isinstance(operation.source.source, - worker_runner_base.InMemorySource) + maptask_executor_runner.InMemorySource) and isinstance(operation.source.source.default_output_coder(), WindowedValueCoder)): output_stream = create_OutputStream() @@ -264,11 +264,9 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): element_coder.get_impl().encode_to_stream( element, output_stream, True) elements_data = output_stream.get() - state_key = beam_fn_api_pb2.StateKey(function_spec_reference=view_id) + state_key = beam_fn_api_pb2.StateKey(key=view_id) state_handler.Clear(state_key) - state_handler.Append( - beam_fn_api_pb2.SimpleStateAppendRequest( - state_key=state_key, data=[elements_data])) + state_handler.Append(state_key, elements_data) elif isinstance(operation, operation_specs.WorkerFlatten): fn = sdk_worker.pack_function_spec_data( @@ -382,9 +380,8 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): return beam_fn_api_pb2.Elements.Data( data=''.join(self._all[self._to_key(state_key)])) - def Append(self, append_request): - self._all[self._to_key(append_request.state_key)].extend( - append_request.data) + def Append(self, state_key, data): + self._all[self._to_key(state_key)].extend(data) def Clear(self, state_key): try: @@ -394,8 +391,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): @staticmethod def _to_key(state_key): - return (state_key.function_spec_reference, state_key.window, - state_key.key) + return state_key.window, state_key.key class DirectController(object): """An in-memory controller for fn API control, state and data planes.""" http://git-wip-us.apache.org/repos/asf/beam/blob/e71eb66a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py index 633602f..66d985a 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py @@ -20,10 +20,10 @@ import unittest import apache_beam as beam from apache_beam.runners.portability import fn_api_runner -from apache_beam.runners.portability import maptask_executor_runner +from apache_beam.runners.portability import maptask_executor_runner_test -class FnApiRunnerTest(maptask_executor_runner.MapTaskExecutorRunner): +class FnApiRunnerTest(maptask_executor_runner_test.MapTaskExecutorRunnerTest): def create_pipeline(self): return beam.Pipeline(runner=fn_api_runner.FnApiRunner()) http://git-wip-us.apache.org/repos/asf/beam/blob/e71eb66a/sdks/python/apache_beam/runners/worker/operations.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index a44561d..c4f945b 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -129,6 +129,7 @@ class Operation(object): self.operation_name + '-finish') # TODO(ccy): the '-abort' state can be added when the abort is supported in # Operations. + self.scoped_metrics_container = None def start(self): """Start operation.""" http://git-wip-us.apache.org/repos/asf/beam/blob/e71eb66a/sdks/python/apache_beam/runners/worker/sdk_worker.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 33f2b61..dc4f5c2 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -359,7 +359,7 @@ class SdkWorker(object): source=SideInputSource( self.state_handler, beam_fn_api_pb2.StateKey( - function_spec_reference=si.view_fn.id), + key=si.view_fn.id.encode('utf-8')), coder=unpack_and_deserialize_py_fn(si.view_fn))) output_tags = list(transform.outputs.keys()) spec = operation_specs.WorkerDoFn(