[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411446&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411446 ]
ASF GitHub Bot logged work on BEAM-9608: ---------------------------------------- Author: ASF GitHub Bot Created on: 27/Mar/20 22:42 Start Date: 27/Mar/20 22:42 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399570237 ########## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ########## @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object): ``beam.PCollection``. """ def __init__(self, - worker_handler_factory, # type: Callable[[Optional[str], int], List[WorkerHandler]] + worker_handler_manager, # type: worker_handlers.WorkerHandlerManager pipeline_components, # type: beam_runner_api_pb2.Components safe_coders, data_channel_coders, ): """ - :param worker_handler_factory: A ``callable`` that takes in an environment + :param worker_handler_manager: A ``callable`` that takes in an environment id and a number of workers, and returns a list of ``WorkerHandler``s. :param pipeline_components: (beam_runner_api_pb2.Components): TODO :param safe_coders: :param data_channel_coders: """ self.pcoll_buffers = {} # type: MutableMapping[bytes, PartitionableBuffer] - self.worker_handler_factory = worker_handler_factory + self.worker_handler_manager = worker_handler_manager self.pipeline_components = pipeline_components self.safe_coders = safe_coders self.data_channel_coders = data_channel_coders + self.pipeline_context = pipeline_context.PipelineContext( + self.pipeline_components, + iterable_state_write=self._iterable_state_write) + self._last_uid = -1 + + def next_uid(self): + self._last_uid += 1 + return str(self._last_uid) + + def _iterable_state_write(self, values, element_coder_impl): + # type: (...) -> bytes + token = unique_name(None, 'iter').encode('ascii') + out = create_OutputStream() + for element in values: + element_coder_impl.encode_to_stream(element, out, True) + self.worker_handler_manager.state_servicer.append_raw( + beam_fn_api_pb2.StateKey( + runner=beam_fn_api_pb2.StateKey.Runner(key=token)), + out.get()) + return token + class BundleContextManager(object): def __init__(self, - execution_context, # type: FnApiRunnerExecutionContext - process_bundle_descriptor, # type: beam_fn_api_pb2.ProcessBundleDescriptor - worker_handler, # type: fn_runner.WorkerHandler - p_context, # type: pipeline_context.PipelineContext - ): + execution_context, # type: FnApiRunnerExecutionContext + stage, # type: translations.Stage + num_workers, # type: int + ): self.execution_context = execution_context - self.process_bundle_descriptor = process_bundle_descriptor - self.worker_handler = worker_handler - self.pipeline_context = p_context + self.stage = stage + self.bundle_uid = self.execution_context.next_uid() + self.num_workers = num_workers + + # Properties that are lazily initialized + self._process_bundle_descriptor = None + self._worker_handlers = None + + @property + def worker_handlers(self): + if self._worker_handlers is None: + self._worker_handlers = self.execution_context.worker_handler_manager\ + .get_worker_handlers(self.stage.environment, self.num_workers) + return self._worker_handlers + + def data_api_service_descriptor(self): + # All worker_handlers share the same grpc server, so we can read grpc server + # info from any worker_handler and read from the first worker_handler. + return next(iter(self.worker_handlers)).data_api_service_descriptor() Review comment: Done. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 411446) Time Spent: 1h 50m (was: 1h 40m) > Add context managers for FnApiRunner to manage execution of each bundle > ----------------------------------------------------------------------- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core > Reporter: Pablo Estrada > Assignee: Pablo Estrada > Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)