[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=414259&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-414259 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 01/Apr/20 19:41 Start Date: 01/Apr/20 19:41 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r401863648 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -245,37 +254,108 @@ class FnApiRunnerExecutionContext(object): ``beam.PCollection``. """ def __init__(self, - worker_handler_factory, # type: Callable[[Optional[str], int], List[WorkerHandler]] + worker_handler_manager, # type: worker_handlers.WorkerHandlerManager pipeline_components, # type: beam_runner_api_pb2.Components safe_coders, data_channel_coders, ): """ -:param worker_handler_factory: A ``callable`` that takes in an environment -id and a number of workers, and returns a list of ``WorkerHandler``s. +:param worker_handler_manager: This class manages the set of worker +handlers, and the communication with state / control APIs. :param pipeline_components: (beam_runner_api_pb2.Components): TODO :param safe_coders: :param data_channel_coders: """ self.pcoll_buffers = {} # type: MutableMapping[bytes, PartitionableBuffer] -self.worker_handler_factory = worker_handler_factory +self.worker_handler_manager = worker_handler_manager self.pipeline_components = pipeline_components self.safe_coders = safe_coders self.data_channel_coders = data_channel_coders +self.pipeline_context = pipeline_context.PipelineContext( +self.pipeline_components, +iterable_state_write=self._iterable_state_write) +self._last_uid = -1 + + @property + def state_servicer(self): +# TODO(BEAM-9625): Ensure FnApiRunnerExecutionContext owns StateServicer +return self.worker_handler_manager.state_servicer + + def next_uid(self): +self._last_uid += 1 +return str(self._last_uid) + + def _iterable_state_write(self, values, element_coder_impl): +# type: (...) -> bytes +token = unique_name(None, 'iter').encode('ascii') +out = create_OutputStream() +for element in values: + element_coder_impl.encode_to_stream(element, out, True) +self.worker_handler_manager.state_servicer.append_raw( +beam_fn_api_pb2.StateKey( +runner=beam_fn_api_pb2.StateKey.Runner(key=token)), +out.get()) +return token + class BundleContextManager(object): def __init__(self, - execution_context, # type: FnApiRunnerExecutionContext - process_bundle_descriptor, # type: beam_fn_api_pb2.ProcessBundleDescriptor - worker_handler, # type: fn_runner.WorkerHandler - p_context, # type: pipeline_context.PipelineContext - ): + execution_context, # type: FnApiRunnerExecutionContext + stage, # type: translations.Stage + num_workers, # type: int + ): self.execution_context = execution_context -self.process_bundle_descriptor = process_bundle_descriptor -self.worker_handler = worker_handler -self.pipeline_context = p_context +self.stage = stage +self.bundle_uid = self.execution_context.next_uid() +self.num_workers = num_workers + +# Properties that are lazily initialized +self._process_bundle_descriptor = None +self._worker_handlers = None + + @property + def worker_handlers(self): +if self._worker_handlers is None: + self._worker_handlers = ( + self.execution_context.worker_handler_manager.get_worker_handlers( + self.stage.environment, self.num_workers)) +return self._worker_handlers + + def data_api_service_descriptor(self): +# All worker_handlers share the same grpc server, so we can read grpc server +# info from any worker_handler and read from the first worker_handler. +return self.worker_handlers[0].data_api_service_descriptor() + + def state_api_service_descriptor(self): +# All worker_handlers share the same grpc server, so we can read grpc server +# info from any worker_handler and read from the first worker_handler. +return self.worker_handlers[0].state_api_service_descriptor() + + @property + def process_bundle_descriptor(self): +if self._process_bundle_descriptor is None: + self._process_bundle_descriptor = self._build_process_bundle_descriptor() +return self._process_bundle_descriptor + + def _build_process_bundle_descriptor(self): Review comment: I was also bitten by this. I am trying to think of a better way
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=414260&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-414260 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 01/Apr/20 19:41 Start Date: 01/Apr/20 19:41 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r401863675 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ## @@ -511,87 +509,32 @@ def _add_residuals_and_channel_splits_to_deferred_inputs( def _run_stage(self, runner_execution_context, # type: execution.FnApiRunnerExecutionContext - stage, # type: translations.Stage + bundle_context_manager, # type: execution.BundleContextManager Review comment: I'm adding all these comments to https://github.com/apache/beam/pull/11270 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 414260) Time Spent: 3h 40m (was: 3.5h) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Fix For: 2.21.0 > > Time Spent: 3h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=414221&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-414221 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 01/Apr/20 18:45 Start Date: 01/Apr/20 18:45 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r401830220 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -245,37 +254,108 @@ class FnApiRunnerExecutionContext(object): ``beam.PCollection``. """ def __init__(self, - worker_handler_factory, # type: Callable[[Optional[str], int], List[WorkerHandler]] + worker_handler_manager, # type: worker_handlers.WorkerHandlerManager pipeline_components, # type: beam_runner_api_pb2.Components safe_coders, data_channel_coders, ): """ -:param worker_handler_factory: A ``callable`` that takes in an environment -id and a number of workers, and returns a list of ``WorkerHandler``s. +:param worker_handler_manager: This class manages the set of worker +handlers, and the communication with state / control APIs. :param pipeline_components: (beam_runner_api_pb2.Components): TODO :param safe_coders: :param data_channel_coders: """ self.pcoll_buffers = {} # type: MutableMapping[bytes, PartitionableBuffer] -self.worker_handler_factory = worker_handler_factory +self.worker_handler_manager = worker_handler_manager self.pipeline_components = pipeline_components self.safe_coders = safe_coders self.data_channel_coders = data_channel_coders +self.pipeline_context = pipeline_context.PipelineContext( +self.pipeline_components, +iterable_state_write=self._iterable_state_write) +self._last_uid = -1 + + @property + def state_servicer(self): +# TODO(BEAM-9625): Ensure FnApiRunnerExecutionContext owns StateServicer +return self.worker_handler_manager.state_servicer + + def next_uid(self): +self._last_uid += 1 +return str(self._last_uid) + + def _iterable_state_write(self, values, element_coder_impl): +# type: (...) -> bytes +token = unique_name(None, 'iter').encode('ascii') +out = create_OutputStream() +for element in values: + element_coder_impl.encode_to_stream(element, out, True) +self.worker_handler_manager.state_servicer.append_raw( +beam_fn_api_pb2.StateKey( +runner=beam_fn_api_pb2.StateKey.Runner(key=token)), +out.get()) +return token + class BundleContextManager(object): def __init__(self, - execution_context, # type: FnApiRunnerExecutionContext - process_bundle_descriptor, # type: beam_fn_api_pb2.ProcessBundleDescriptor - worker_handler, # type: fn_runner.WorkerHandler - p_context, # type: pipeline_context.PipelineContext - ): + execution_context, # type: FnApiRunnerExecutionContext + stage, # type: translations.Stage + num_workers, # type: int + ): self.execution_context = execution_context -self.process_bundle_descriptor = process_bundle_descriptor -self.worker_handler = worker_handler -self.pipeline_context = p_context +self.stage = stage +self.bundle_uid = self.execution_context.next_uid() +self.num_workers = num_workers + +# Properties that are lazily initialized +self._process_bundle_descriptor = None +self._worker_handlers = None + + @property + def worker_handlers(self): +if self._worker_handlers is None: + self._worker_handlers = ( + self.execution_context.worker_handler_manager.get_worker_handlers( + self.stage.environment, self.num_workers)) +return self._worker_handlers + + def data_api_service_descriptor(self): +# All worker_handlers share the same grpc server, so we can read grpc server +# info from any worker_handler and read from the first worker_handler. +return self.worker_handlers[0].data_api_service_descriptor() + + def state_api_service_descriptor(self): +# All worker_handlers share the same grpc server, so we can read grpc server +# info from any worker_handler and read from the first worker_handler. +return self.worker_handlers[0].state_api_service_descriptor() + + @property + def process_bundle_descriptor(self): +if self._process_bundle_descriptor is None: + self._process_bundle_descriptor = self._build_process_bundle_descriptor() +return self._process_bundle_descriptor + + def _build_process_bundle_descriptor(self): Review comment: I was bitten by the fact that it is an error to access the pr
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=414222&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-414222 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 01/Apr/20 18:45 Start Date: 01/Apr/20 18:45 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r401831550 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ## @@ -511,87 +509,32 @@ def _add_residuals_and_channel_splits_to_deferred_inputs( def _run_stage(self, runner_execution_context, # type: execution.FnApiRunnerExecutionContext - stage, # type: translations.Stage + bundle_context_manager, # type: execution.BundleContextManager Review comment: On this note, perhaps it makes sense to break FnApiRunner into the (mostly stateless) runner that can execute multiple pipelines and an executor (that has methods like run_stage) that might be stateful and is initialized with and tasked with running a single pipeline. Much of what is on context(s) would become state of self of this new object. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 414222) Time Spent: 3h 20m (was: 3h 10m) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Fix For: 2.21.0 > > Time Spent: 3h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=414220&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-414220 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 01/Apr/20 18:45 Start Date: 01/Apr/20 18:45 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r401828861 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ## @@ -511,87 +509,32 @@ def _add_residuals_and_channel_splits_to_deferred_inputs( def _run_stage(self, runner_execution_context, # type: execution.FnApiRunnerExecutionContext - stage, # type: translations.Stage + bundle_context_manager, # type: execution.BundleContextManager Review comment: It's odd that _run_stage no longer takes as a parameter the stage to run. Perhaps bundle_context_manager (and its class?) should be named stage_context or similar? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 414220) Time Spent: 3h (was: 2h 50m) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Fix For: 2.21.0 > > Time Spent: 3h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=413718&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-413718 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 01/Apr/20 03:19 Start Date: 01/Apr/20 03:19 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 413718) Time Spent: 2h 50m (was: 2h 40m) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Fix For: 2.21.0 > > Time Spent: 2h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=413641&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-413641 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 01/Apr/20 00:40 Start Date: 01/Apr/20 00:40 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r401293360 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ## @@ -369,8 +366,8 @@ def _store_side_inputs_in_state(self, state_key = beam_fn_api_pb2.StateKey( iterable_side_input=beam_fn_api_pb2.StateKey.IterableSideInput( transform_id=transform_id, side_input_id=tag, window=window)) - bundle_context_manager.worker_handler.state.append_raw( - state_key, elements_data) + runner_execution_context.worker_handler_manager.state_servicer\ Review comment: Link to JIRA for refernce? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 413641) Time Spent: 2h 40m (was: 2.5h) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Fix For: 2.21.0 > > Time Spent: 2h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=412675&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-412675 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 30/Mar/20 23:08 Start Date: 30/Mar/20 23:08 Worklog Time Spent: 10m Work Description: pabloem commented on issue #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#issuecomment-606298444 @robertwb thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 412675) Time Spent: 2.5h (was: 2h 20m) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Fix For: 2.21.0 > > Time Spent: 2.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411449&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411449 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:43 Start Date: 27/Mar/20 22:43 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399570655 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ## @@ -415,24 +412,24 @@ def _run_bundle_multiple_times_for_testing( cache_token_generator=cache_token_generator) testing_bundle_manager.process_bundle(data_input, data_output) finally: -worker_handler.state.restore() + runner_execution_context.worker_handler_manager.state_servicer.restore() def _collect_written_timers_and_add_to_deferred_inputs( self, - pipeline_components, # type: beam_runner_api_pb2.Components - stage, # type: translations.Stage + runner_execution_context, # type: execution.FnApiRunnerExecutionContext bundle_context_manager, # type: execution.BundleContextManager deferred_inputs, # type: MutableMapping[str, PartitionableBuffer] - data_channel_coders, # type: Mapping[str, str] ): # type: (...) -> None -for transform_id, timer_writes in stage.timer_pcollections: +for transform_id, timer_writes in \ Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411449) Time Spent: 2h 20m (was: 2h 10m) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411448&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411448 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:43 Start Date: 27/Mar/20 22:43 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399570601 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ## @@ -369,8 +366,8 @@ def _store_side_inputs_in_state(self, state_key = beam_fn_api_pb2.StateKey( iterable_side_input=beam_fn_api_pb2.StateKey.IterableSideInput( transform_id=transform_id, side_input_id=tag, window=window)) - bundle_context_manager.worker_handler.state.append_raw( - state_key, elements_data) + runner_execution_context.worker_handler_manager.state_servicer\ Review comment: I've just added the state_sevicer() from your first comment, but I've created a jira issue to track fixng that later on. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411448) Time Spent: 2h 10m (was: 2h) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411444&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411444 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:42 Start Date: 27/Mar/20 22:42 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399570204 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object): ``beam.PCollection``. """ def __init__(self, - worker_handler_factory, # type: Callable[[Optional[str], int], List[WorkerHandler]] + worker_handler_manager, # type: worker_handlers.WorkerHandlerManager pipeline_components, # type: beam_runner_api_pb2.Components safe_coders, data_channel_coders, ): """ -:param worker_handler_factory: A ``callable`` that takes in an environment +:param worker_handler_manager: A ``callable`` that takes in an environment Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411444) Time Spent: 1.5h (was: 1h 20m) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411447&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411447 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:42 Start Date: 27/Mar/20 22:42 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399570251 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object): ``beam.PCollection``. """ def __init__(self, - worker_handler_factory, # type: Callable[[Optional[str], int], List[WorkerHandler]] + worker_handler_manager, # type: worker_handlers.WorkerHandlerManager pipeline_components, # type: beam_runner_api_pb2.Components safe_coders, data_channel_coders, ): """ -:param worker_handler_factory: A ``callable`` that takes in an environment +:param worker_handler_manager: A ``callable`` that takes in an environment id and a number of workers, and returns a list of ``WorkerHandler``s. :param pipeline_components: (beam_runner_api_pb2.Components): TODO :param safe_coders: :param data_channel_coders: """ self.pcoll_buffers = {} # type: MutableMapping[bytes, PartitionableBuffer] -self.worker_handler_factory = worker_handler_factory +self.worker_handler_manager = worker_handler_manager self.pipeline_components = pipeline_components self.safe_coders = safe_coders self.data_channel_coders = data_channel_coders +self.pipeline_context = pipeline_context.PipelineContext( +self.pipeline_components, +iterable_state_write=self._iterable_state_write) +self._last_uid = -1 + + def next_uid(self): +self._last_uid += 1 +return str(self._last_uid) + + def _iterable_state_write(self, values, element_coder_impl): +# type: (...) -> bytes +token = unique_name(None, 'iter').encode('ascii') +out = create_OutputStream() +for element in values: + element_coder_impl.encode_to_stream(element, out, True) +self.worker_handler_manager.state_servicer.append_raw( +beam_fn_api_pb2.StateKey( +runner=beam_fn_api_pb2.StateKey.Runner(key=token)), +out.get()) +return token + class BundleContextManager(object): def __init__(self, - execution_context, # type: FnApiRunnerExecutionContext - process_bundle_descriptor, # type: beam_fn_api_pb2.ProcessBundleDescriptor - worker_handler, # type: fn_runner.WorkerHandler - p_context, # type: pipeline_context.PipelineContext - ): + execution_context, # type: FnApiRunnerExecutionContext + stage, # type: translations.Stage + num_workers, # type: int + ): self.execution_context = execution_context -self.process_bundle_descriptor = process_bundle_descriptor -self.worker_handler = worker_handler -self.pipeline_context = p_context +self.stage = stage +self.bundle_uid = self.execution_context.next_uid() +self.num_workers = num_workers + +# Properties that are lazily initialized +self._process_bundle_descriptor = None +self._worker_handlers = None + + @property + def worker_handlers(self): +if self._worker_handlers is None: + self._worker_handlers = self.execution_context.worker_handler_manager\ +.get_worker_handlers(self.stage.environment, self.num_workers) +return self._worker_handlers + + def data_api_service_descriptor(self): +# All worker_handlers share the same grpc server, so we can read grpc server +# info from any worker_handler and read from the first worker_handler. +return next(iter(self.worker_handlers)).data_api_service_descriptor() + + def state_api_service_descriptor(self): +# All worker_handlers share the same grpc server, so we can read grpc server +# info from any worker_handler and read from the first worker_handler. +return next(iter(self.worker_handlers)).state_api_service_descriptor() + + @property + def process_bundle_descriptor(self): +if self._process_bundle_descriptor is None: + self._process_bundle_descriptor = self._build_process_bundle_descriptor() +return self._process_bundle_descriptor + + def _build_process_bundle_descriptor(self): +res = beam_fn_api_pb2.ProcessBundleDescriptor( +id=self.bundle_uid, +transforms={ +transform.unique_name: transform +for transform in self.stage.transforms +}, +pcollections=dict( +self.execution_context.pipeline_components.pcollections.items()), +
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411445&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411445 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:42 Start Date: 27/Mar/20 22:42 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399570217 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object): ``beam.PCollection``. """ def __init__(self, - worker_handler_factory, # type: Callable[[Optional[str], int], List[WorkerHandler]] + worker_handler_manager, # type: worker_handlers.WorkerHandlerManager pipeline_components, # type: beam_runner_api_pb2.Components safe_coders, data_channel_coders, ): """ -:param worker_handler_factory: A ``callable`` that takes in an environment +:param worker_handler_manager: A ``callable`` that takes in an environment id and a number of workers, and returns a list of ``WorkerHandler``s. :param pipeline_components: (beam_runner_api_pb2.Components): TODO :param safe_coders: :param data_channel_coders: """ self.pcoll_buffers = {} # type: MutableMapping[bytes, PartitionableBuffer] -self.worker_handler_factory = worker_handler_factory +self.worker_handler_manager = worker_handler_manager self.pipeline_components = pipeline_components self.safe_coders = safe_coders self.data_channel_coders = data_channel_coders +self.pipeline_context = pipeline_context.PipelineContext( +self.pipeline_components, +iterable_state_write=self._iterable_state_write) +self._last_uid = -1 + + def next_uid(self): +self._last_uid += 1 +return str(self._last_uid) + + def _iterable_state_write(self, values, element_coder_impl): +# type: (...) -> bytes +token = unique_name(None, 'iter').encode('ascii') +out = create_OutputStream() +for element in values: + element_coder_impl.encode_to_stream(element, out, True) +self.worker_handler_manager.state_servicer.append_raw( +beam_fn_api_pb2.StateKey( +runner=beam_fn_api_pb2.StateKey.Runner(key=token)), +out.get()) +return token + class BundleContextManager(object): def __init__(self, - execution_context, # type: FnApiRunnerExecutionContext - process_bundle_descriptor, # type: beam_fn_api_pb2.ProcessBundleDescriptor - worker_handler, # type: fn_runner.WorkerHandler - p_context, # type: pipeline_context.PipelineContext - ): + execution_context, # type: FnApiRunnerExecutionContext + stage, # type: translations.Stage + num_workers, # type: int + ): self.execution_context = execution_context -self.process_bundle_descriptor = process_bundle_descriptor -self.worker_handler = worker_handler -self.pipeline_context = p_context +self.stage = stage +self.bundle_uid = self.execution_context.next_uid() +self.num_workers = num_workers + +# Properties that are lazily initialized +self._process_bundle_descriptor = None +self._worker_handlers = None + + @property + def worker_handlers(self): +if self._worker_handlers is None: + self._worker_handlers = self.execution_context.worker_handler_manager\ Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411445) Time Spent: 1h 40m (was: 1.5h) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411446&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411446 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:42 Start Date: 27/Mar/20 22:42 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399570237 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object): ``beam.PCollection``. """ def __init__(self, - worker_handler_factory, # type: Callable[[Optional[str], int], List[WorkerHandler]] + worker_handler_manager, # type: worker_handlers.WorkerHandlerManager pipeline_components, # type: beam_runner_api_pb2.Components safe_coders, data_channel_coders, ): """ -:param worker_handler_factory: A ``callable`` that takes in an environment +:param worker_handler_manager: A ``callable`` that takes in an environment id and a number of workers, and returns a list of ``WorkerHandler``s. :param pipeline_components: (beam_runner_api_pb2.Components): TODO :param safe_coders: :param data_channel_coders: """ self.pcoll_buffers = {} # type: MutableMapping[bytes, PartitionableBuffer] -self.worker_handler_factory = worker_handler_factory +self.worker_handler_manager = worker_handler_manager self.pipeline_components = pipeline_components self.safe_coders = safe_coders self.data_channel_coders = data_channel_coders +self.pipeline_context = pipeline_context.PipelineContext( +self.pipeline_components, +iterable_state_write=self._iterable_state_write) +self._last_uid = -1 + + def next_uid(self): +self._last_uid += 1 +return str(self._last_uid) + + def _iterable_state_write(self, values, element_coder_impl): +# type: (...) -> bytes +token = unique_name(None, 'iter').encode('ascii') +out = create_OutputStream() +for element in values: + element_coder_impl.encode_to_stream(element, out, True) +self.worker_handler_manager.state_servicer.append_raw( +beam_fn_api_pb2.StateKey( +runner=beam_fn_api_pb2.StateKey.Runner(key=token)), +out.get()) +return token + class BundleContextManager(object): def __init__(self, - execution_context, # type: FnApiRunnerExecutionContext - process_bundle_descriptor, # type: beam_fn_api_pb2.ProcessBundleDescriptor - worker_handler, # type: fn_runner.WorkerHandler - p_context, # type: pipeline_context.PipelineContext - ): + execution_context, # type: FnApiRunnerExecutionContext + stage, # type: translations.Stage + num_workers, # type: int + ): self.execution_context = execution_context -self.process_bundle_descriptor = process_bundle_descriptor -self.worker_handler = worker_handler -self.pipeline_context = p_context +self.stage = stage +self.bundle_uid = self.execution_context.next_uid() +self.num_workers = num_workers + +# Properties that are lazily initialized +self._process_bundle_descriptor = None +self._worker_handlers = None + + @property + def worker_handlers(self): +if self._worker_handlers is None: + self._worker_handlers = self.execution_context.worker_handler_manager\ +.get_worker_handlers(self.stage.environment, self.num_workers) +return self._worker_handlers + + def data_api_service_descriptor(self): +# All worker_handlers share the same grpc server, so we can read grpc server +# info from any worker_handler and read from the first worker_handler. +return next(iter(self.worker_handlers)).data_api_service_descriptor() Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411446) Time Spent: 1h 50m (was: 1h 40m) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Est
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411419&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411419 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:03 Start Date: 27/Mar/20 22:03 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399555388 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object): ``beam.PCollection``. """ def __init__(self, - worker_handler_factory, # type: Callable[[Optional[str], int], List[WorkerHandler]] + worker_handler_manager, # type: worker_handlers.WorkerHandlerManager pipeline_components, # type: beam_runner_api_pb2.Components safe_coders, data_channel_coders, ): """ -:param worker_handler_factory: A ``callable`` that takes in an environment +:param worker_handler_manager: A ``callable`` that takes in an environment Review comment: Fix description. (Interestingly, I just did this change as well for another PR.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411419) Time Spent: 1h (was: 50m) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411424&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411424 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:03 Start Date: 27/Mar/20 22:03 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399556279 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object): ``beam.PCollection``. """ def __init__(self, - worker_handler_factory, # type: Callable[[Optional[str], int], List[WorkerHandler]] + worker_handler_manager, # type: worker_handlers.WorkerHandlerManager pipeline_components, # type: beam_runner_api_pb2.Components safe_coders, data_channel_coders, ): """ -:param worker_handler_factory: A ``callable`` that takes in an environment +:param worker_handler_manager: A ``callable`` that takes in an environment id and a number of workers, and returns a list of ``WorkerHandler``s. :param pipeline_components: (beam_runner_api_pb2.Components): TODO :param safe_coders: :param data_channel_coders: """ self.pcoll_buffers = {} # type: MutableMapping[bytes, PartitionableBuffer] -self.worker_handler_factory = worker_handler_factory +self.worker_handler_manager = worker_handler_manager self.pipeline_components = pipeline_components self.safe_coders = safe_coders self.data_channel_coders = data_channel_coders +self.pipeline_context = pipeline_context.PipelineContext( +self.pipeline_components, +iterable_state_write=self._iterable_state_write) +self._last_uid = -1 + + def next_uid(self): +self._last_uid += 1 +return str(self._last_uid) + + def _iterable_state_write(self, values, element_coder_impl): +# type: (...) -> bytes +token = unique_name(None, 'iter').encode('ascii') +out = create_OutputStream() +for element in values: + element_coder_impl.encode_to_stream(element, out, True) +self.worker_handler_manager.state_servicer.append_raw( +beam_fn_api_pb2.StateKey( +runner=beam_fn_api_pb2.StateKey.Runner(key=token)), +out.get()) +return token + class BundleContextManager(object): def __init__(self, - execution_context, # type: FnApiRunnerExecutionContext - process_bundle_descriptor, # type: beam_fn_api_pb2.ProcessBundleDescriptor - worker_handler, # type: fn_runner.WorkerHandler - p_context, # type: pipeline_context.PipelineContext - ): + execution_context, # type: FnApiRunnerExecutionContext + stage, # type: translations.Stage + num_workers, # type: int + ): self.execution_context = execution_context -self.process_bundle_descriptor = process_bundle_descriptor -self.worker_handler = worker_handler -self.pipeline_context = p_context +self.stage = stage +self.bundle_uid = self.execution_context.next_uid() +self.num_workers = num_workers + +# Properties that are lazily initialized +self._process_bundle_descriptor = None +self._worker_handlers = None + + @property + def worker_handlers(self): +if self._worker_handlers is None: + self._worker_handlers = self.execution_context.worker_handler_manager\ Review comment: Prefer ()'s to backslashes for line breaks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411424) Time Spent: 1h 20m (was: 1h 10m) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411421&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411421 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:03 Start Date: 27/Mar/20 22:03 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399556846 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object): ``beam.PCollection``. """ def __init__(self, - worker_handler_factory, # type: Callable[[Optional[str], int], List[WorkerHandler]] + worker_handler_manager, # type: worker_handlers.WorkerHandlerManager pipeline_components, # type: beam_runner_api_pb2.Components safe_coders, data_channel_coders, ): """ -:param worker_handler_factory: A ``callable`` that takes in an environment +:param worker_handler_manager: A ``callable`` that takes in an environment id and a number of workers, and returns a list of ``WorkerHandler``s. :param pipeline_components: (beam_runner_api_pb2.Components): TODO :param safe_coders: :param data_channel_coders: """ self.pcoll_buffers = {} # type: MutableMapping[bytes, PartitionableBuffer] -self.worker_handler_factory = worker_handler_factory +self.worker_handler_manager = worker_handler_manager self.pipeline_components = pipeline_components self.safe_coders = safe_coders self.data_channel_coders = data_channel_coders +self.pipeline_context = pipeline_context.PipelineContext( +self.pipeline_components, +iterable_state_write=self._iterable_state_write) +self._last_uid = -1 + + def next_uid(self): +self._last_uid += 1 +return str(self._last_uid) + + def _iterable_state_write(self, values, element_coder_impl): +# type: (...) -> bytes +token = unique_name(None, 'iter').encode('ascii') +out = create_OutputStream() +for element in values: + element_coder_impl.encode_to_stream(element, out, True) +self.worker_handler_manager.state_servicer.append_raw( +beam_fn_api_pb2.StateKey( +runner=beam_fn_api_pb2.StateKey.Runner(key=token)), +out.get()) +return token + class BundleContextManager(object): def __init__(self, - execution_context, # type: FnApiRunnerExecutionContext - process_bundle_descriptor, # type: beam_fn_api_pb2.ProcessBundleDescriptor - worker_handler, # type: fn_runner.WorkerHandler - p_context, # type: pipeline_context.PipelineContext - ): + execution_context, # type: FnApiRunnerExecutionContext + stage, # type: translations.Stage + num_workers, # type: int + ): self.execution_context = execution_context -self.process_bundle_descriptor = process_bundle_descriptor -self.worker_handler = worker_handler -self.pipeline_context = p_context +self.stage = stage +self.bundle_uid = self.execution_context.next_uid() +self.num_workers = num_workers + +# Properties that are lazily initialized +self._process_bundle_descriptor = None +self._worker_handlers = None + + @property + def worker_handlers(self): +if self._worker_handlers is None: + self._worker_handlers = self.execution_context.worker_handler_manager\ +.get_worker_handlers(self.stage.environment, self.num_workers) +return self._worker_handlers + + def data_api_service_descriptor(self): +# All worker_handlers share the same grpc server, so we can read grpc server +# info from any worker_handler and read from the first worker_handler. +return next(iter(self.worker_handlers)).data_api_service_descriptor() Review comment: I don't know how critical this is for performance (mostly for tests), but `self.worker_handlers[0]` might be preferable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411421) Time Spent: 1h 10m (was: 1h) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type:
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411422&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411422 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:03 Start Date: 27/Mar/20 22:03 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399557974 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ## @@ -369,8 +366,8 @@ def _store_side_inputs_in_state(self, state_key = beam_fn_api_pb2.StateKey( iterable_side_input=beam_fn_api_pb2.StateKey.IterableSideInput( transform_id=transform_id, side_input_id=tag, window=window)) - bundle_context_manager.worker_handler.state.append_raw( - state_key, elements_data) + runner_execution_context.worker_handler_manager.state_servicer\ Review comment: Perhaps add a state_servicer() method right on runner_execution_context (and use several places below)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411422) Time Spent: 1h 20m (was: 1h 10m) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411423&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411423 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:03 Start Date: 27/Mar/20 22:03 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399557293 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object): ``beam.PCollection``. """ def __init__(self, - worker_handler_factory, # type: Callable[[Optional[str], int], List[WorkerHandler]] + worker_handler_manager, # type: worker_handlers.WorkerHandlerManager pipeline_components, # type: beam_runner_api_pb2.Components safe_coders, data_channel_coders, ): """ -:param worker_handler_factory: A ``callable`` that takes in an environment +:param worker_handler_manager: A ``callable`` that takes in an environment id and a number of workers, and returns a list of ``WorkerHandler``s. :param pipeline_components: (beam_runner_api_pb2.Components): TODO :param safe_coders: :param data_channel_coders: """ self.pcoll_buffers = {} # type: MutableMapping[bytes, PartitionableBuffer] -self.worker_handler_factory = worker_handler_factory +self.worker_handler_manager = worker_handler_manager self.pipeline_components = pipeline_components self.safe_coders = safe_coders self.data_channel_coders = data_channel_coders +self.pipeline_context = pipeline_context.PipelineContext( +self.pipeline_components, +iterable_state_write=self._iterable_state_write) +self._last_uid = -1 + + def next_uid(self): +self._last_uid += 1 +return str(self._last_uid) + + def _iterable_state_write(self, values, element_coder_impl): +# type: (...) -> bytes +token = unique_name(None, 'iter').encode('ascii') +out = create_OutputStream() +for element in values: + element_coder_impl.encode_to_stream(element, out, True) +self.worker_handler_manager.state_servicer.append_raw( +beam_fn_api_pb2.StateKey( +runner=beam_fn_api_pb2.StateKey.Runner(key=token)), +out.get()) +return token + class BundleContextManager(object): def __init__(self, - execution_context, # type: FnApiRunnerExecutionContext - process_bundle_descriptor, # type: beam_fn_api_pb2.ProcessBundleDescriptor - worker_handler, # type: fn_runner.WorkerHandler - p_context, # type: pipeline_context.PipelineContext - ): + execution_context, # type: FnApiRunnerExecutionContext + stage, # type: translations.Stage + num_workers, # type: int + ): self.execution_context = execution_context -self.process_bundle_descriptor = process_bundle_descriptor -self.worker_handler = worker_handler -self.pipeline_context = p_context +self.stage = stage +self.bundle_uid = self.execution_context.next_uid() +self.num_workers = num_workers + +# Properties that are lazily initialized +self._process_bundle_descriptor = None +self._worker_handlers = None + + @property + def worker_handlers(self): +if self._worker_handlers is None: + self._worker_handlers = self.execution_context.worker_handler_manager\ +.get_worker_handlers(self.stage.environment, self.num_workers) +return self._worker_handlers + + def data_api_service_descriptor(self): +# All worker_handlers share the same grpc server, so we can read grpc server +# info from any worker_handler and read from the first worker_handler. +return next(iter(self.worker_handlers)).data_api_service_descriptor() + + def state_api_service_descriptor(self): +# All worker_handlers share the same grpc server, so we can read grpc server +# info from any worker_handler and read from the first worker_handler. +return next(iter(self.worker_handlers)).state_api_service_descriptor() + + @property + def process_bundle_descriptor(self): +if self._process_bundle_descriptor is None: + self._process_bundle_descriptor = self._build_process_bundle_descriptor() +return self._process_bundle_descriptor + + def _build_process_bundle_descriptor(self): +res = beam_fn_api_pb2.ProcessBundleDescriptor( +id=self.bundle_uid, +transforms={ +transform.unique_name: transform +for transform in self.stage.transforms +}, +pcollections=dict( +self.execution_context.pipeline_components.pcollections.items()), +
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411420&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411420 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:03 Start Date: 27/Mar/20 22:03 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399558218 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ## @@ -415,24 +412,24 @@ def _run_bundle_multiple_times_for_testing( cache_token_generator=cache_token_generator) testing_bundle_manager.process_bundle(data_input, data_output) finally: -worker_handler.state.restore() + runner_execution_context.worker_handler_manager.state_servicer.restore() def _collect_written_timers_and_add_to_deferred_inputs( self, - pipeline_components, # type: beam_runner_api_pb2.Components - stage, # type: translations.Stage + runner_execution_context, # type: execution.FnApiRunnerExecutionContext bundle_context_manager, # type: execution.BundleContextManager deferred_inputs, # type: MutableMapping[str, PartitionableBuffer] - data_channel_coders, # type: Mapping[str, str] ): # type: (...) -> None -for transform_id, timer_writes in stage.timer_pcollections: +for transform_id, timer_writes in \ Review comment: backslash This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411420) Time Spent: 1h 10m (was: 1h) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411425&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411425 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:03 Start Date: 27/Mar/20 22:03 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399558954 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ## @@ -369,8 +366,8 @@ def _store_side_inputs_in_state(self, state_key = beam_fn_api_pb2.StateKey( iterable_side_input=beam_fn_api_pb2.StateKey.IterableSideInput( transform_id=transform_id, side_input_id=tag, window=window)) - bundle_context_manager.worker_handler.state.append_raw( - state_key, elements_data) + runner_execution_context.worker_handler_manager.state_servicer\ Review comment: On second thought, perhaps the ownership of state servicer should be moved up to runner_execution_context (though the worker manager may need a reference). Your call. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411425) Time Spent: 1h 20m (was: 1h 10m) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=409957&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-409957 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 25/Mar/20 23:20 Start Date: 25/Mar/20 23:20 Worklog Time Spent: 10m Work Description: pabloem commented on issue #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#issuecomment-604139704 r: @robertwb I've added two classes under `execution`: - `FnApiExecutionContext`, which manages runner-related context utilities and variables, like intermediate pcollection buffers, worker handler manager, safe coders and pipeline context. - `BundleContextManager`, which manages bundle-exeecution-related utilities and variables, such as the process bundle descriptor, the worker handlers for that stage, and the callbacks for get_buffer and get_coder_impl. This change increases their scope to manage iterable_state_write, and worker handlers. The goal in the future (streaming) is for `FnApiExecutionContext` to manage a watermark manager. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 409957) Time Spent: 50m (was: 40m) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=409938&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-409938 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 25/Mar/20 22:24 Start Date: 25/Mar/20 22:24 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_Va
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=409824&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-409824 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 25/Mar/20 20:58 Start Date: 25/Mar/20 20:58 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11202: [BEAM-9608] Refactoring FnApiRunner to have more context managers URL: https://github.com/apache/beam/pull/11202 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 409824) Time Spent: 0.5h (was: 20m) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=409729&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-409729 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 25/Mar/20 18:54 Start Date: 25/Mar/20 18:54 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11202: [BEAM-9608] Refactoring FnApiRunner to have more context managers URL: https://github.com/apache/beam/pull/11202#discussion_r398094737 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -0,0 +1,337 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Set of utilities for execution of a pipeline by the FnApiRunner.""" + +from __future__ import absolute_import + +import collections +import itertools + +from typing_extensions import Protocol + +from apache_beam import coders +from apache_beam.coders.coder_impl import create_InputStream +from apache_beam.coders.coder_impl import create_OutputStream +from apache_beam.portability import common_urns +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.runners.portability.fn_api_runner.translations import only_element +from apache_beam.runners.portability.fn_api_runner.translations import split_buffer_id +from apache_beam.runners.worker import bundle_processor +from apache_beam.transforms import trigger +from apache_beam.transforms.window import GlobalWindow +from apache_beam.transforms.window import GlobalWindows +from apache_beam.utils import windowed_value + + +class Buffer(Protocol): + def __iter__(self): +# type: () -> Iterator[bytes] +pass + + def append(self, item): +# type: (bytes) -> None +pass + + +class PartitionableBuffer(Buffer, Protocol): + def partition(self, n): +# type: (int) -> List[List[bytes]] +pass + + +class ListBuffer(object): + """Used to support parititioning of a list.""" + def __init__(self, coder_impl): +self._coder_impl = coder_impl +self._inputs = [] # type: List[bytes] +self._grouped_output = None +self.cleared = False + + def append(self, element): +# type: (bytes) -> None +if self.cleared: + raise RuntimeError('Trying to append to a cleared ListBuffer.') +if self._grouped_output: + raise RuntimeError('ListBuffer append after read.') +self._inputs.append(element) + + def partition(self, n): +# type: (int) -> List[List[bytes]] +if self.cleared: + raise RuntimeError('Trying to partition a cleared ListBuffer.') +if len(self._inputs) >= n or len(self._inputs) == 0: + return [self._inputs[k::n] for k in range(n)] +else: + if not self._grouped_output: +output_stream_list = [create_OutputStream() for _ in range(n)] +idx = 0 +for input in self._inputs: + input_stream = create_InputStream(input) + while input_stream.size() > 0: +decoded_value = self._coder_impl.decode_from_stream( +input_stream, True) +self._coder_impl.encode_to_stream( +decoded_value, output_stream_list[idx], True) +idx = (idx + 1) % n +self._grouped_output = [[output_stream.get()] +for output_stream in output_stream_list] + return self._grouped_output + + def __iter__(self): +# type: () -> Iterator[bytes] +if self.cleared: + raise RuntimeError('Trying to iterate through a cleared ListBuffer.') +return iter(self._inputs) + + def clear(self): +# type: () -> None +self.cleared = True +self._inputs = [] +self._grouped_output = None + + +class GroupingBuffer(object): + """Used to accumulate groupded (shuffled) results.""" + def __init__(self, + pre_grouped_coder, # type: coders.Coder + post_grouped_coder, # type: coders.Coder + windowing + ): +# type: (...) -> None +self._key_coder = pre_grouped_coder.key_coder() +self._pre_grouped_coder = pre_grouped_coder +self._post_grouped_coder = post_grouped_coder +self._table = col
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=409712&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-409712 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 25/Mar/20 18:36 Start Date: 25/Mar/20 18:36 Worklog Time Spent: 10m Work Description: Hannah-Jiang commented on pull request #11202: [BEAM-9608] Refactoring FnApiRunner to have more context managers URL: https://github.com/apache/beam/pull/11202#discussion_r398079535 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -0,0 +1,337 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Set of utilities for execution of a pipeline by the FnApiRunner.""" + +from __future__ import absolute_import + +import collections +import itertools + +from typing_extensions import Protocol + +from apache_beam import coders +from apache_beam.coders.coder_impl import create_InputStream +from apache_beam.coders.coder_impl import create_OutputStream +from apache_beam.portability import common_urns +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.runners.portability.fn_api_runner.translations import only_element +from apache_beam.runners.portability.fn_api_runner.translations import split_buffer_id +from apache_beam.runners.worker import bundle_processor +from apache_beam.transforms import trigger +from apache_beam.transforms.window import GlobalWindow +from apache_beam.transforms.window import GlobalWindows +from apache_beam.utils import windowed_value + + +class Buffer(Protocol): + def __iter__(self): +# type: () -> Iterator[bytes] +pass + + def append(self, item): +# type: (bytes) -> None +pass + + +class PartitionableBuffer(Buffer, Protocol): + def partition(self, n): +# type: (int) -> List[List[bytes]] +pass + + +class ListBuffer(object): + """Used to support parititioning of a list.""" + def __init__(self, coder_impl): +self._coder_impl = coder_impl +self._inputs = [] # type: List[bytes] +self._grouped_output = None +self.cleared = False + + def append(self, element): +# type: (bytes) -> None +if self.cleared: + raise RuntimeError('Trying to append to a cleared ListBuffer.') +if self._grouped_output: + raise RuntimeError('ListBuffer append after read.') +self._inputs.append(element) + + def partition(self, n): +# type: (int) -> List[List[bytes]] +if self.cleared: + raise RuntimeError('Trying to partition a cleared ListBuffer.') +if len(self._inputs) >= n or len(self._inputs) == 0: + return [self._inputs[k::n] for k in range(n)] +else: + if not self._grouped_output: +output_stream_list = [create_OutputStream() for _ in range(n)] +idx = 0 +for input in self._inputs: + input_stream = create_InputStream(input) + while input_stream.size() > 0: +decoded_value = self._coder_impl.decode_from_stream( +input_stream, True) +self._coder_impl.encode_to_stream( +decoded_value, output_stream_list[idx], True) +idx = (idx + 1) % n +self._grouped_output = [[output_stream.get()] +for output_stream in output_stream_list] + return self._grouped_output + + def __iter__(self): +# type: () -> Iterator[bytes] +if self.cleared: + raise RuntimeError('Trying to iterate through a cleared ListBuffer.') +return iter(self._inputs) + + def clear(self): +# type: () -> None +self.cleared = True +self._inputs = [] +self._grouped_output = None + + +class GroupingBuffer(object): + """Used to accumulate groupded (shuffled) results.""" + def __init__(self, + pre_grouped_coder, # type: coders.Coder + post_grouped_coder, # type: coders.Coder + windowing + ): +# type: (...) -> None +self._key_coder = pre_grouped_coder.key_coder() +self._pre_grouped_coder = pre_grouped_coder +self._post_grouped_coder = post_grouped_coder +self._table