[ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=414259&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-414259
 ]

ASF GitHub Bot logged work on BEAM-9608:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Apr/20 19:41
            Start Date: 01/Apr/20 19:41
    Worklog Time Spent: 10m 
      Work Description: pabloem commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r401863648
 
 

 ##########
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##########
 @@ -245,37 +254,108 @@ class FnApiRunnerExecutionContext(object):
        ``beam.PCollection``.
  """
   def __init__(self,
-      worker_handler_factory,  # type: Callable[[Optional[str], int], 
List[WorkerHandler]]
+      worker_handler_manager,  # type: worker_handlers.WorkerHandlerManager
       pipeline_components,  # type: beam_runner_api_pb2.Components
       safe_coders,
       data_channel_coders,
                ):
     """
-    :param worker_handler_factory: A ``callable`` that takes in an environment
-        id and a number of workers, and returns a list of ``WorkerHandler``s.
+    :param worker_handler_manager: This class manages the set of worker
+        handlers, and the communication with state / control APIs.
     :param pipeline_components:  (beam_runner_api_pb2.Components): TODO
     :param safe_coders:
     :param data_channel_coders:
     """
     self.pcoll_buffers = {}  # type: MutableMapping[bytes, PartitionableBuffer]
-    self.worker_handler_factory = worker_handler_factory
+    self.worker_handler_manager = worker_handler_manager
     self.pipeline_components = pipeline_components
     self.safe_coders = safe_coders
     self.data_channel_coders = data_channel_coders
 
+    self.pipeline_context = pipeline_context.PipelineContext(
+        self.pipeline_components,
+        iterable_state_write=self._iterable_state_write)
+    self._last_uid = -1
+
+  @property
+  def state_servicer(self):
+    # TODO(BEAM-9625): Ensure FnApiRunnerExecutionContext owns StateServicer
+    return self.worker_handler_manager.state_servicer
+
+  def next_uid(self):
+    self._last_uid += 1
+    return str(self._last_uid)
+
+  def _iterable_state_write(self, values, element_coder_impl):
+    # type: (...) -> bytes
+    token = unique_name(None, 'iter').encode('ascii')
+    out = create_OutputStream()
+    for element in values:
+      element_coder_impl.encode_to_stream(element, out, True)
+    self.worker_handler_manager.state_servicer.append_raw(
+        beam_fn_api_pb2.StateKey(
+            runner=beam_fn_api_pb2.StateKey.Runner(key=token)),
+        out.get())
+    return token
+
 
 class BundleContextManager(object):
 
   def __init__(self,
-      execution_context, # type: FnApiRunnerExecutionContext
-      process_bundle_descriptor,  # type: 
beam_fn_api_pb2.ProcessBundleDescriptor
-      worker_handler,  # type: fn_runner.WorkerHandler
-      p_context,  # type: pipeline_context.PipelineContext
-               ):
+               execution_context, # type: FnApiRunnerExecutionContext
+               stage,  # type: translations.Stage
+               num_workers,  # type: int
+              ):
     self.execution_context = execution_context
-    self.process_bundle_descriptor = process_bundle_descriptor
-    self.worker_handler = worker_handler
-    self.pipeline_context = p_context
+    self.stage = stage
+    self.bundle_uid = self.execution_context.next_uid()
+    self.num_workers = num_workers
+
+    # Properties that are lazily initialized
+    self._process_bundle_descriptor = None
+    self._worker_handlers = None
+
+  @property
+  def worker_handlers(self):
+    if self._worker_handlers is None:
+      self._worker_handlers = (
+          self.execution_context.worker_handler_manager.get_worker_handlers(
+              self.stage.environment, self.num_workers))
+    return self._worker_handlers
+
+  def data_api_service_descriptor(self):
+    # All worker_handlers share the same grpc server, so we can read grpc 
server
+    # info from any worker_handler and read from the first worker_handler.
+    return self.worker_handlers[0].data_api_service_descriptor()
+
+  def state_api_service_descriptor(self):
+    # All worker_handlers share the same grpc server, so we can read grpc 
server
+    # info from any worker_handler and read from the first worker_handler.
+    return self.worker_handlers[0].state_api_service_descriptor()
+
+  @property
+  def process_bundle_descriptor(self):
+    if self._process_bundle_descriptor is None:
+      self._process_bundle_descriptor = self._build_process_bundle_descriptor()
+    return self._process_bundle_descriptor
+
+  def _build_process_bundle_descriptor(self):
 
 Review comment:
   I was also bitten by this. I am trying to think of a better way to mark a 
Read transform as an Impulse Read, but can't think of one at the moment.
   
   I'll add documentation for this. 
   
   Once it's full streaming, the IMPULSE buffers will be enqued before the 
pipeline starts, and extract_outputs will work with non-impulse buffers only - 
thus we can modify the transform payload before starting the pipeline, and not 
compute the bundle descriptor lazily.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 414259)
    Time Spent: 3.5h  (was: 3h 20m)

> Add context managers for FnApiRunner to manage execution of each bundle
> -----------------------------------------------------------------------
>
>                 Key: BEAM-9608
>                 URL: https://issues.apache.org/jira/browse/BEAM-9608
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Pablo Estrada
>            Assignee: Pablo Estrada
>            Priority: Major
>             Fix For: 2.21.0
>
>          Time Spent: 3.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to