[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-04-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=414259&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-414259
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 01/Apr/20 19:41
Start Date: 01/Apr/20 19:41
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r401863648
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -245,37 +254,108 @@ class FnApiRunnerExecutionContext(object):
``beam.PCollection``.
  """
   def __init__(self,
-  worker_handler_factory,  # type: Callable[[Optional[str], int], 
List[WorkerHandler]]
+  worker_handler_manager,  # type: worker_handlers.WorkerHandlerManager
   pipeline_components,  # type: beam_runner_api_pb2.Components
   safe_coders,
   data_channel_coders,
):
 """
-:param worker_handler_factory: A ``callable`` that takes in an environment
-id and a number of workers, and returns a list of ``WorkerHandler``s.
+:param worker_handler_manager: This class manages the set of worker
+handlers, and the communication with state / control APIs.
 :param pipeline_components:  (beam_runner_api_pb2.Components): TODO
 :param safe_coders:
 :param data_channel_coders:
 """
 self.pcoll_buffers = {}  # type: MutableMapping[bytes, PartitionableBuffer]
-self.worker_handler_factory = worker_handler_factory
+self.worker_handler_manager = worker_handler_manager
 self.pipeline_components = pipeline_components
 self.safe_coders = safe_coders
 self.data_channel_coders = data_channel_coders
 
+self.pipeline_context = pipeline_context.PipelineContext(
+self.pipeline_components,
+iterable_state_write=self._iterable_state_write)
+self._last_uid = -1
+
+  @property
+  def state_servicer(self):
+# TODO(BEAM-9625): Ensure FnApiRunnerExecutionContext owns StateServicer
+return self.worker_handler_manager.state_servicer
+
+  def next_uid(self):
+self._last_uid += 1
+return str(self._last_uid)
+
+  def _iterable_state_write(self, values, element_coder_impl):
+# type: (...) -> bytes
+token = unique_name(None, 'iter').encode('ascii')
+out = create_OutputStream()
+for element in values:
+  element_coder_impl.encode_to_stream(element, out, True)
+self.worker_handler_manager.state_servicer.append_raw(
+beam_fn_api_pb2.StateKey(
+runner=beam_fn_api_pb2.StateKey.Runner(key=token)),
+out.get())
+return token
+
 
 class BundleContextManager(object):
 
   def __init__(self,
-  execution_context, # type: FnApiRunnerExecutionContext
-  process_bundle_descriptor,  # type: 
beam_fn_api_pb2.ProcessBundleDescriptor
-  worker_handler,  # type: fn_runner.WorkerHandler
-  p_context,  # type: pipeline_context.PipelineContext
-   ):
+   execution_context, # type: FnApiRunnerExecutionContext
+   stage,  # type: translations.Stage
+   num_workers,  # type: int
+  ):
 self.execution_context = execution_context
-self.process_bundle_descriptor = process_bundle_descriptor
-self.worker_handler = worker_handler
-self.pipeline_context = p_context
+self.stage = stage
+self.bundle_uid = self.execution_context.next_uid()
+self.num_workers = num_workers
+
+# Properties that are lazily initialized
+self._process_bundle_descriptor = None
+self._worker_handlers = None
+
+  @property
+  def worker_handlers(self):
+if self._worker_handlers is None:
+  self._worker_handlers = (
+  self.execution_context.worker_handler_manager.get_worker_handlers(
+  self.stage.environment, self.num_workers))
+return self._worker_handlers
+
+  def data_api_service_descriptor(self):
+# All worker_handlers share the same grpc server, so we can read grpc 
server
+# info from any worker_handler and read from the first worker_handler.
+return self.worker_handlers[0].data_api_service_descriptor()
+
+  def state_api_service_descriptor(self):
+# All worker_handlers share the same grpc server, so we can read grpc 
server
+# info from any worker_handler and read from the first worker_handler.
+return self.worker_handlers[0].state_api_service_descriptor()
+
+  @property
+  def process_bundle_descriptor(self):
+if self._process_bundle_descriptor is None:
+  self._process_bundle_descriptor = self._build_process_bundle_descriptor()
+return self._process_bundle_descriptor
+
+  def _build_process_bundle_descriptor(self):
 
 Review comment:
   I was also bitten by this. I am trying to think of a better way

[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-04-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=414260&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-414260
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 01/Apr/20 19:41
Start Date: 01/Apr/20 19:41
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r401863675
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -511,87 +509,32 @@ def _add_residuals_and_channel_splits_to_deferred_inputs(
 
   def _run_stage(self,
  runner_execution_context,  # type: 
execution.FnApiRunnerExecutionContext
- stage,  # type: translations.Stage
+ bundle_context_manager,  # type: 
execution.BundleContextManager
 
 Review comment:
   I'm adding all these comments to https://github.com/apache/beam/pull/11270
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 414260)
Time Spent: 3h 40m  (was: 3.5h)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-04-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=414221&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-414221
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 01/Apr/20 18:45
Start Date: 01/Apr/20 18:45
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r401830220
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -245,37 +254,108 @@ class FnApiRunnerExecutionContext(object):
``beam.PCollection``.
  """
   def __init__(self,
-  worker_handler_factory,  # type: Callable[[Optional[str], int], 
List[WorkerHandler]]
+  worker_handler_manager,  # type: worker_handlers.WorkerHandlerManager
   pipeline_components,  # type: beam_runner_api_pb2.Components
   safe_coders,
   data_channel_coders,
):
 """
-:param worker_handler_factory: A ``callable`` that takes in an environment
-id and a number of workers, and returns a list of ``WorkerHandler``s.
+:param worker_handler_manager: This class manages the set of worker
+handlers, and the communication with state / control APIs.
 :param pipeline_components:  (beam_runner_api_pb2.Components): TODO
 :param safe_coders:
 :param data_channel_coders:
 """
 self.pcoll_buffers = {}  # type: MutableMapping[bytes, PartitionableBuffer]
-self.worker_handler_factory = worker_handler_factory
+self.worker_handler_manager = worker_handler_manager
 self.pipeline_components = pipeline_components
 self.safe_coders = safe_coders
 self.data_channel_coders = data_channel_coders
 
+self.pipeline_context = pipeline_context.PipelineContext(
+self.pipeline_components,
+iterable_state_write=self._iterable_state_write)
+self._last_uid = -1
+
+  @property
+  def state_servicer(self):
+# TODO(BEAM-9625): Ensure FnApiRunnerExecutionContext owns StateServicer
+return self.worker_handler_manager.state_servicer
+
+  def next_uid(self):
+self._last_uid += 1
+return str(self._last_uid)
+
+  def _iterable_state_write(self, values, element_coder_impl):
+# type: (...) -> bytes
+token = unique_name(None, 'iter').encode('ascii')
+out = create_OutputStream()
+for element in values:
+  element_coder_impl.encode_to_stream(element, out, True)
+self.worker_handler_manager.state_servicer.append_raw(
+beam_fn_api_pb2.StateKey(
+runner=beam_fn_api_pb2.StateKey.Runner(key=token)),
+out.get())
+return token
+
 
 class BundleContextManager(object):
 
   def __init__(self,
-  execution_context, # type: FnApiRunnerExecutionContext
-  process_bundle_descriptor,  # type: 
beam_fn_api_pb2.ProcessBundleDescriptor
-  worker_handler,  # type: fn_runner.WorkerHandler
-  p_context,  # type: pipeline_context.PipelineContext
-   ):
+   execution_context, # type: FnApiRunnerExecutionContext
+   stage,  # type: translations.Stage
+   num_workers,  # type: int
+  ):
 self.execution_context = execution_context
-self.process_bundle_descriptor = process_bundle_descriptor
-self.worker_handler = worker_handler
-self.pipeline_context = p_context
+self.stage = stage
+self.bundle_uid = self.execution_context.next_uid()
+self.num_workers = num_workers
+
+# Properties that are lazily initialized
+self._process_bundle_descriptor = None
+self._worker_handlers = None
+
+  @property
+  def worker_handlers(self):
+if self._worker_handlers is None:
+  self._worker_handlers = (
+  self.execution_context.worker_handler_manager.get_worker_handlers(
+  self.stage.environment, self.num_workers))
+return self._worker_handlers
+
+  def data_api_service_descriptor(self):
+# All worker_handlers share the same grpc server, so we can read grpc 
server
+# info from any worker_handler and read from the first worker_handler.
+return self.worker_handlers[0].data_api_service_descriptor()
+
+  def state_api_service_descriptor(self):
+# All worker_handlers share the same grpc server, so we can read grpc 
server
+# info from any worker_handler and read from the first worker_handler.
+return self.worker_handlers[0].state_api_service_descriptor()
+
+  @property
+  def process_bundle_descriptor(self):
+if self._process_bundle_descriptor is None:
+  self._process_bundle_descriptor = self._build_process_bundle_descriptor()
+return self._process_bundle_descriptor
+
+  def _build_process_bundle_descriptor(self):
 
 Review comment:
   I was bitten by the fact that it is an error to access the 
pr

[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-04-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=414222&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-414222
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 01/Apr/20 18:45
Start Date: 01/Apr/20 18:45
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r401831550
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -511,87 +509,32 @@ def _add_residuals_and_channel_splits_to_deferred_inputs(
 
   def _run_stage(self,
  runner_execution_context,  # type: 
execution.FnApiRunnerExecutionContext
- stage,  # type: translations.Stage
+ bundle_context_manager,  # type: 
execution.BundleContextManager
 
 Review comment:
   On this note, perhaps it makes sense to break FnApiRunner into the (mostly 
stateless) runner that can execute multiple pipelines and an executor (that has 
methods like run_stage) that might be stateful and is initialized with and 
tasked with running a single pipeline. Much of what is on context(s) would 
become state of self of this new object. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 414222)
Time Spent: 3h 20m  (was: 3h 10m)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-04-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=414220&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-414220
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 01/Apr/20 18:45
Start Date: 01/Apr/20 18:45
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r401828861
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -511,87 +509,32 @@ def _add_residuals_and_channel_splits_to_deferred_inputs(
 
   def _run_stage(self,
  runner_execution_context,  # type: 
execution.FnApiRunnerExecutionContext
- stage,  # type: translations.Stage
+ bundle_context_manager,  # type: 
execution.BundleContextManager
 
 Review comment:
   It's odd that _run_stage no longer takes as a parameter the stage to run. 
Perhaps bundle_context_manager (and its class?) should be named stage_context 
or similar? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 414220)
Time Spent: 3h  (was: 2h 50m)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 3h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-31 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=413718&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-413718
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 01/Apr/20 03:19
Start Date: 01/Apr/20 03:19
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 413718)
Time Spent: 2h 50m  (was: 2h 40m)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-31 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=413641&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-413641
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 01/Apr/20 00:40
Start Date: 01/Apr/20 00:40
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r401293360
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -369,8 +366,8 @@ def _store_side_inputs_in_state(self,
   state_key = beam_fn_api_pb2.StateKey(
   iterable_side_input=beam_fn_api_pb2.StateKey.IterableSideInput(
   transform_id=transform_id, side_input_id=tag, window=window))
-  bundle_context_manager.worker_handler.state.append_raw(
-  state_key, elements_data)
+  runner_execution_context.worker_handler_manager.state_servicer\
 
 Review comment:
   Link to JIRA for refernce?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 413641)
Time Spent: 2h 40m  (was: 2.5h)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=412675&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-412675
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 30/Mar/20 23:08
Start Date: 30/Mar/20 23:08
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#issuecomment-606298444
 
 
   @robertwb thoughts?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 412675)
Time Spent: 2.5h  (was: 2h 20m)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411449&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411449
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:43
Start Date: 27/Mar/20 22:43
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399570655
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -415,24 +412,24 @@ def _run_bundle_multiple_times_for_testing(
 cache_token_generator=cache_token_generator)
 testing_bundle_manager.process_bundle(data_input, data_output)
   finally:
-worker_handler.state.restore()
+
runner_execution_context.worker_handler_manager.state_servicer.restore()
 
   def _collect_written_timers_and_add_to_deferred_inputs(
   self,
-  pipeline_components,  # type: beam_runner_api_pb2.Components
-  stage,  # type: translations.Stage
+  runner_execution_context,  # type: execution.FnApiRunnerExecutionContext
   bundle_context_manager,  # type: execution.BundleContextManager
   deferred_inputs,  # type: MutableMapping[str, PartitionableBuffer]
-  data_channel_coders,  # type: Mapping[str, str]
   ):
 # type: (...) -> None
 
-for transform_id, timer_writes in stage.timer_pcollections:
+for transform_id, timer_writes in \
 
 Review comment:
   Done
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411449)
Time Spent: 2h 20m  (was: 2h 10m)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411448&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411448
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:43
Start Date: 27/Mar/20 22:43
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399570601
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -369,8 +366,8 @@ def _store_side_inputs_in_state(self,
   state_key = beam_fn_api_pb2.StateKey(
   iterable_side_input=beam_fn_api_pb2.StateKey.IterableSideInput(
   transform_id=transform_id, side_input_id=tag, window=window))
-  bundle_context_manager.worker_handler.state.append_raw(
-  state_key, elements_data)
+  runner_execution_context.worker_handler_manager.state_servicer\
 
 Review comment:
   I've just added the state_sevicer() from your first comment, but I've 
created a jira issue to track fixng that later on.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411448)
Time Spent: 2h 10m  (was: 2h)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411444&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411444
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:42
Start Date: 27/Mar/20 22:42
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399570204
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object):
``beam.PCollection``.
  """
   def __init__(self,
-  worker_handler_factory,  # type: Callable[[Optional[str], int], 
List[WorkerHandler]]
+  worker_handler_manager,  # type: worker_handlers.WorkerHandlerManager
   pipeline_components,  # type: beam_runner_api_pb2.Components
   safe_coders,
   data_channel_coders,
):
 """
-:param worker_handler_factory: A ``callable`` that takes in an environment
+:param worker_handler_manager: A ``callable`` that takes in an environment
 
 Review comment:
   Done.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411444)
Time Spent: 1.5h  (was: 1h 20m)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411447&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411447
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:42
Start Date: 27/Mar/20 22:42
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399570251
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object):
``beam.PCollection``.
  """
   def __init__(self,
-  worker_handler_factory,  # type: Callable[[Optional[str], int], 
List[WorkerHandler]]
+  worker_handler_manager,  # type: worker_handlers.WorkerHandlerManager
   pipeline_components,  # type: beam_runner_api_pb2.Components
   safe_coders,
   data_channel_coders,
):
 """
-:param worker_handler_factory: A ``callable`` that takes in an environment
+:param worker_handler_manager: A ``callable`` that takes in an environment
 id and a number of workers, and returns a list of ``WorkerHandler``s.
 :param pipeline_components:  (beam_runner_api_pb2.Components): TODO
 :param safe_coders:
 :param data_channel_coders:
 """
 self.pcoll_buffers = {}  # type: MutableMapping[bytes, PartitionableBuffer]
-self.worker_handler_factory = worker_handler_factory
+self.worker_handler_manager = worker_handler_manager
 self.pipeline_components = pipeline_components
 self.safe_coders = safe_coders
 self.data_channel_coders = data_channel_coders
 
+self.pipeline_context = pipeline_context.PipelineContext(
+self.pipeline_components,
+iterable_state_write=self._iterable_state_write)
+self._last_uid = -1
+
+  def next_uid(self):
+self._last_uid += 1
+return str(self._last_uid)
+
+  def _iterable_state_write(self, values, element_coder_impl):
+# type: (...) -> bytes
+token = unique_name(None, 'iter').encode('ascii')
+out = create_OutputStream()
+for element in values:
+  element_coder_impl.encode_to_stream(element, out, True)
+self.worker_handler_manager.state_servicer.append_raw(
+beam_fn_api_pb2.StateKey(
+runner=beam_fn_api_pb2.StateKey.Runner(key=token)),
+out.get())
+return token
+
 
 class BundleContextManager(object):
 
   def __init__(self,
-  execution_context, # type: FnApiRunnerExecutionContext
-  process_bundle_descriptor,  # type: 
beam_fn_api_pb2.ProcessBundleDescriptor
-  worker_handler,  # type: fn_runner.WorkerHandler
-  p_context,  # type: pipeline_context.PipelineContext
-   ):
+   execution_context, # type: FnApiRunnerExecutionContext
+   stage,  # type: translations.Stage
+   num_workers,  # type: int
+  ):
 self.execution_context = execution_context
-self.process_bundle_descriptor = process_bundle_descriptor
-self.worker_handler = worker_handler
-self.pipeline_context = p_context
+self.stage = stage
+self.bundle_uid = self.execution_context.next_uid()
+self.num_workers = num_workers
+
+# Properties that are lazily initialized
+self._process_bundle_descriptor = None
+self._worker_handlers = None
+
+  @property
+  def worker_handlers(self):
+if self._worker_handlers is None:
+  self._worker_handlers = self.execution_context.worker_handler_manager\
+.get_worker_handlers(self.stage.environment, self.num_workers)
+return self._worker_handlers
+
+  def data_api_service_descriptor(self):
+# All worker_handlers share the same grpc server, so we can read grpc 
server
+# info from any worker_handler and read from the first worker_handler.
+return next(iter(self.worker_handlers)).data_api_service_descriptor()
+
+  def state_api_service_descriptor(self):
+# All worker_handlers share the same grpc server, so we can read grpc 
server
+# info from any worker_handler and read from the first worker_handler.
+return next(iter(self.worker_handlers)).state_api_service_descriptor()
+
+  @property
+  def process_bundle_descriptor(self):
+if self._process_bundle_descriptor is None:
+  self._process_bundle_descriptor = self._build_process_bundle_descriptor()
+return self._process_bundle_descriptor
+
+  def _build_process_bundle_descriptor(self):
+res = beam_fn_api_pb2.ProcessBundleDescriptor(
+id=self.bundle_uid,
+transforms={
+transform.unique_name: transform
+for transform in self.stage.transforms
+},
+pcollections=dict(
+self.execution_context.pipeline_components.pcollections.items()),
+   

[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411445&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411445
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:42
Start Date: 27/Mar/20 22:42
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399570217
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object):
``beam.PCollection``.
  """
   def __init__(self,
-  worker_handler_factory,  # type: Callable[[Optional[str], int], 
List[WorkerHandler]]
+  worker_handler_manager,  # type: worker_handlers.WorkerHandlerManager
   pipeline_components,  # type: beam_runner_api_pb2.Components
   safe_coders,
   data_channel_coders,
):
 """
-:param worker_handler_factory: A ``callable`` that takes in an environment
+:param worker_handler_manager: A ``callable`` that takes in an environment
 id and a number of workers, and returns a list of ``WorkerHandler``s.
 :param pipeline_components:  (beam_runner_api_pb2.Components): TODO
 :param safe_coders:
 :param data_channel_coders:
 """
 self.pcoll_buffers = {}  # type: MutableMapping[bytes, PartitionableBuffer]
-self.worker_handler_factory = worker_handler_factory
+self.worker_handler_manager = worker_handler_manager
 self.pipeline_components = pipeline_components
 self.safe_coders = safe_coders
 self.data_channel_coders = data_channel_coders
 
+self.pipeline_context = pipeline_context.PipelineContext(
+self.pipeline_components,
+iterable_state_write=self._iterable_state_write)
+self._last_uid = -1
+
+  def next_uid(self):
+self._last_uid += 1
+return str(self._last_uid)
+
+  def _iterable_state_write(self, values, element_coder_impl):
+# type: (...) -> bytes
+token = unique_name(None, 'iter').encode('ascii')
+out = create_OutputStream()
+for element in values:
+  element_coder_impl.encode_to_stream(element, out, True)
+self.worker_handler_manager.state_servicer.append_raw(
+beam_fn_api_pb2.StateKey(
+runner=beam_fn_api_pb2.StateKey.Runner(key=token)),
+out.get())
+return token
+
 
 class BundleContextManager(object):
 
   def __init__(self,
-  execution_context, # type: FnApiRunnerExecutionContext
-  process_bundle_descriptor,  # type: 
beam_fn_api_pb2.ProcessBundleDescriptor
-  worker_handler,  # type: fn_runner.WorkerHandler
-  p_context,  # type: pipeline_context.PipelineContext
-   ):
+   execution_context, # type: FnApiRunnerExecutionContext
+   stage,  # type: translations.Stage
+   num_workers,  # type: int
+  ):
 self.execution_context = execution_context
-self.process_bundle_descriptor = process_bundle_descriptor
-self.worker_handler = worker_handler
-self.pipeline_context = p_context
+self.stage = stage
+self.bundle_uid = self.execution_context.next_uid()
+self.num_workers = num_workers
+
+# Properties that are lazily initialized
+self._process_bundle_descriptor = None
+self._worker_handlers = None
+
+  @property
+  def worker_handlers(self):
+if self._worker_handlers is None:
+  self._worker_handlers = self.execution_context.worker_handler_manager\
 
 Review comment:
   Done.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411445)
Time Spent: 1h 40m  (was: 1.5h)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411446&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411446
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:42
Start Date: 27/Mar/20 22:42
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399570237
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object):
``beam.PCollection``.
  """
   def __init__(self,
-  worker_handler_factory,  # type: Callable[[Optional[str], int], 
List[WorkerHandler]]
+  worker_handler_manager,  # type: worker_handlers.WorkerHandlerManager
   pipeline_components,  # type: beam_runner_api_pb2.Components
   safe_coders,
   data_channel_coders,
):
 """
-:param worker_handler_factory: A ``callable`` that takes in an environment
+:param worker_handler_manager: A ``callable`` that takes in an environment
 id and a number of workers, and returns a list of ``WorkerHandler``s.
 :param pipeline_components:  (beam_runner_api_pb2.Components): TODO
 :param safe_coders:
 :param data_channel_coders:
 """
 self.pcoll_buffers = {}  # type: MutableMapping[bytes, PartitionableBuffer]
-self.worker_handler_factory = worker_handler_factory
+self.worker_handler_manager = worker_handler_manager
 self.pipeline_components = pipeline_components
 self.safe_coders = safe_coders
 self.data_channel_coders = data_channel_coders
 
+self.pipeline_context = pipeline_context.PipelineContext(
+self.pipeline_components,
+iterable_state_write=self._iterable_state_write)
+self._last_uid = -1
+
+  def next_uid(self):
+self._last_uid += 1
+return str(self._last_uid)
+
+  def _iterable_state_write(self, values, element_coder_impl):
+# type: (...) -> bytes
+token = unique_name(None, 'iter').encode('ascii')
+out = create_OutputStream()
+for element in values:
+  element_coder_impl.encode_to_stream(element, out, True)
+self.worker_handler_manager.state_servicer.append_raw(
+beam_fn_api_pb2.StateKey(
+runner=beam_fn_api_pb2.StateKey.Runner(key=token)),
+out.get())
+return token
+
 
 class BundleContextManager(object):
 
   def __init__(self,
-  execution_context, # type: FnApiRunnerExecutionContext
-  process_bundle_descriptor,  # type: 
beam_fn_api_pb2.ProcessBundleDescriptor
-  worker_handler,  # type: fn_runner.WorkerHandler
-  p_context,  # type: pipeline_context.PipelineContext
-   ):
+   execution_context, # type: FnApiRunnerExecutionContext
+   stage,  # type: translations.Stage
+   num_workers,  # type: int
+  ):
 self.execution_context = execution_context
-self.process_bundle_descriptor = process_bundle_descriptor
-self.worker_handler = worker_handler
-self.pipeline_context = p_context
+self.stage = stage
+self.bundle_uid = self.execution_context.next_uid()
+self.num_workers = num_workers
+
+# Properties that are lazily initialized
+self._process_bundle_descriptor = None
+self._worker_handlers = None
+
+  @property
+  def worker_handlers(self):
+if self._worker_handlers is None:
+  self._worker_handlers = self.execution_context.worker_handler_manager\
+.get_worker_handlers(self.stage.environment, self.num_workers)
+return self._worker_handlers
+
+  def data_api_service_descriptor(self):
+# All worker_handlers share the same grpc server, so we can read grpc 
server
+# info from any worker_handler and read from the first worker_handler.
+return next(iter(self.worker_handlers)).data_api_service_descriptor()
 
 Review comment:
   Done.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411446)
Time Spent: 1h 50m  (was: 1h 40m)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Est

[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411419&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411419
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:03
Start Date: 27/Mar/20 22:03
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399555388
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object):
``beam.PCollection``.
  """
   def __init__(self,
-  worker_handler_factory,  # type: Callable[[Optional[str], int], 
List[WorkerHandler]]
+  worker_handler_manager,  # type: worker_handlers.WorkerHandlerManager
   pipeline_components,  # type: beam_runner_api_pb2.Components
   safe_coders,
   data_channel_coders,
):
 """
-:param worker_handler_factory: A ``callable`` that takes in an environment
+:param worker_handler_manager: A ``callable`` that takes in an environment
 
 Review comment:
   Fix description. 
   
   (Interestingly, I just did this change as well for another PR.)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411419)
Time Spent: 1h  (was: 50m)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411424&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411424
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:03
Start Date: 27/Mar/20 22:03
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399556279
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object):
``beam.PCollection``.
  """
   def __init__(self,
-  worker_handler_factory,  # type: Callable[[Optional[str], int], 
List[WorkerHandler]]
+  worker_handler_manager,  # type: worker_handlers.WorkerHandlerManager
   pipeline_components,  # type: beam_runner_api_pb2.Components
   safe_coders,
   data_channel_coders,
):
 """
-:param worker_handler_factory: A ``callable`` that takes in an environment
+:param worker_handler_manager: A ``callable`` that takes in an environment
 id and a number of workers, and returns a list of ``WorkerHandler``s.
 :param pipeline_components:  (beam_runner_api_pb2.Components): TODO
 :param safe_coders:
 :param data_channel_coders:
 """
 self.pcoll_buffers = {}  # type: MutableMapping[bytes, PartitionableBuffer]
-self.worker_handler_factory = worker_handler_factory
+self.worker_handler_manager = worker_handler_manager
 self.pipeline_components = pipeline_components
 self.safe_coders = safe_coders
 self.data_channel_coders = data_channel_coders
 
+self.pipeline_context = pipeline_context.PipelineContext(
+self.pipeline_components,
+iterable_state_write=self._iterable_state_write)
+self._last_uid = -1
+
+  def next_uid(self):
+self._last_uid += 1
+return str(self._last_uid)
+
+  def _iterable_state_write(self, values, element_coder_impl):
+# type: (...) -> bytes
+token = unique_name(None, 'iter').encode('ascii')
+out = create_OutputStream()
+for element in values:
+  element_coder_impl.encode_to_stream(element, out, True)
+self.worker_handler_manager.state_servicer.append_raw(
+beam_fn_api_pb2.StateKey(
+runner=beam_fn_api_pb2.StateKey.Runner(key=token)),
+out.get())
+return token
+
 
 class BundleContextManager(object):
 
   def __init__(self,
-  execution_context, # type: FnApiRunnerExecutionContext
-  process_bundle_descriptor,  # type: 
beam_fn_api_pb2.ProcessBundleDescriptor
-  worker_handler,  # type: fn_runner.WorkerHandler
-  p_context,  # type: pipeline_context.PipelineContext
-   ):
+   execution_context, # type: FnApiRunnerExecutionContext
+   stage,  # type: translations.Stage
+   num_workers,  # type: int
+  ):
 self.execution_context = execution_context
-self.process_bundle_descriptor = process_bundle_descriptor
-self.worker_handler = worker_handler
-self.pipeline_context = p_context
+self.stage = stage
+self.bundle_uid = self.execution_context.next_uid()
+self.num_workers = num_workers
+
+# Properties that are lazily initialized
+self._process_bundle_descriptor = None
+self._worker_handlers = None
+
+  @property
+  def worker_handlers(self):
+if self._worker_handlers is None:
+  self._worker_handlers = self.execution_context.worker_handler_manager\
 
 Review comment:
   Prefer ()'s to backslashes for line breaks.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411424)
Time Spent: 1h 20m  (was: 1h 10m)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411421&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411421
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:03
Start Date: 27/Mar/20 22:03
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399556846
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object):
``beam.PCollection``.
  """
   def __init__(self,
-  worker_handler_factory,  # type: Callable[[Optional[str], int], 
List[WorkerHandler]]
+  worker_handler_manager,  # type: worker_handlers.WorkerHandlerManager
   pipeline_components,  # type: beam_runner_api_pb2.Components
   safe_coders,
   data_channel_coders,
):
 """
-:param worker_handler_factory: A ``callable`` that takes in an environment
+:param worker_handler_manager: A ``callable`` that takes in an environment
 id and a number of workers, and returns a list of ``WorkerHandler``s.
 :param pipeline_components:  (beam_runner_api_pb2.Components): TODO
 :param safe_coders:
 :param data_channel_coders:
 """
 self.pcoll_buffers = {}  # type: MutableMapping[bytes, PartitionableBuffer]
-self.worker_handler_factory = worker_handler_factory
+self.worker_handler_manager = worker_handler_manager
 self.pipeline_components = pipeline_components
 self.safe_coders = safe_coders
 self.data_channel_coders = data_channel_coders
 
+self.pipeline_context = pipeline_context.PipelineContext(
+self.pipeline_components,
+iterable_state_write=self._iterable_state_write)
+self._last_uid = -1
+
+  def next_uid(self):
+self._last_uid += 1
+return str(self._last_uid)
+
+  def _iterable_state_write(self, values, element_coder_impl):
+# type: (...) -> bytes
+token = unique_name(None, 'iter').encode('ascii')
+out = create_OutputStream()
+for element in values:
+  element_coder_impl.encode_to_stream(element, out, True)
+self.worker_handler_manager.state_servicer.append_raw(
+beam_fn_api_pb2.StateKey(
+runner=beam_fn_api_pb2.StateKey.Runner(key=token)),
+out.get())
+return token
+
 
 class BundleContextManager(object):
 
   def __init__(self,
-  execution_context, # type: FnApiRunnerExecutionContext
-  process_bundle_descriptor,  # type: 
beam_fn_api_pb2.ProcessBundleDescriptor
-  worker_handler,  # type: fn_runner.WorkerHandler
-  p_context,  # type: pipeline_context.PipelineContext
-   ):
+   execution_context, # type: FnApiRunnerExecutionContext
+   stage,  # type: translations.Stage
+   num_workers,  # type: int
+  ):
 self.execution_context = execution_context
-self.process_bundle_descriptor = process_bundle_descriptor
-self.worker_handler = worker_handler
-self.pipeline_context = p_context
+self.stage = stage
+self.bundle_uid = self.execution_context.next_uid()
+self.num_workers = num_workers
+
+# Properties that are lazily initialized
+self._process_bundle_descriptor = None
+self._worker_handlers = None
+
+  @property
+  def worker_handlers(self):
+if self._worker_handlers is None:
+  self._worker_handlers = self.execution_context.worker_handler_manager\
+.get_worker_handlers(self.stage.environment, self.num_workers)
+return self._worker_handlers
+
+  def data_api_service_descriptor(self):
+# All worker_handlers share the same grpc server, so we can read grpc 
server
+# info from any worker_handler and read from the first worker_handler.
+return next(iter(self.worker_handlers)).data_api_service_descriptor()
 
 Review comment:
   I don't know how critical this is for performance (mostly for tests), but 
`self.worker_handlers[0]` might be preferable. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411421)
Time Spent: 1h 10m  (was: 1h)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type:

[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411422&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411422
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:03
Start Date: 27/Mar/20 22:03
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399557974
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -369,8 +366,8 @@ def _store_side_inputs_in_state(self,
   state_key = beam_fn_api_pb2.StateKey(
   iterable_side_input=beam_fn_api_pb2.StateKey.IterableSideInput(
   transform_id=transform_id, side_input_id=tag, window=window))
-  bundle_context_manager.worker_handler.state.append_raw(
-  state_key, elements_data)
+  runner_execution_context.worker_handler_manager.state_servicer\
 
 Review comment:
   Perhaps add a state_servicer() method right on runner_execution_context (and 
use several places below)?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411422)
Time Spent: 1h 20m  (was: 1h 10m)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411423&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411423
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:03
Start Date: 27/Mar/20 22:03
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399557293
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object):
``beam.PCollection``.
  """
   def __init__(self,
-  worker_handler_factory,  # type: Callable[[Optional[str], int], 
List[WorkerHandler]]
+  worker_handler_manager,  # type: worker_handlers.WorkerHandlerManager
   pipeline_components,  # type: beam_runner_api_pb2.Components
   safe_coders,
   data_channel_coders,
):
 """
-:param worker_handler_factory: A ``callable`` that takes in an environment
+:param worker_handler_manager: A ``callable`` that takes in an environment
 id and a number of workers, and returns a list of ``WorkerHandler``s.
 :param pipeline_components:  (beam_runner_api_pb2.Components): TODO
 :param safe_coders:
 :param data_channel_coders:
 """
 self.pcoll_buffers = {}  # type: MutableMapping[bytes, PartitionableBuffer]
-self.worker_handler_factory = worker_handler_factory
+self.worker_handler_manager = worker_handler_manager
 self.pipeline_components = pipeline_components
 self.safe_coders = safe_coders
 self.data_channel_coders = data_channel_coders
 
+self.pipeline_context = pipeline_context.PipelineContext(
+self.pipeline_components,
+iterable_state_write=self._iterable_state_write)
+self._last_uid = -1
+
+  def next_uid(self):
+self._last_uid += 1
+return str(self._last_uid)
+
+  def _iterable_state_write(self, values, element_coder_impl):
+# type: (...) -> bytes
+token = unique_name(None, 'iter').encode('ascii')
+out = create_OutputStream()
+for element in values:
+  element_coder_impl.encode_to_stream(element, out, True)
+self.worker_handler_manager.state_servicer.append_raw(
+beam_fn_api_pb2.StateKey(
+runner=beam_fn_api_pb2.StateKey.Runner(key=token)),
+out.get())
+return token
+
 
 class BundleContextManager(object):
 
   def __init__(self,
-  execution_context, # type: FnApiRunnerExecutionContext
-  process_bundle_descriptor,  # type: 
beam_fn_api_pb2.ProcessBundleDescriptor
-  worker_handler,  # type: fn_runner.WorkerHandler
-  p_context,  # type: pipeline_context.PipelineContext
-   ):
+   execution_context, # type: FnApiRunnerExecutionContext
+   stage,  # type: translations.Stage
+   num_workers,  # type: int
+  ):
 self.execution_context = execution_context
-self.process_bundle_descriptor = process_bundle_descriptor
-self.worker_handler = worker_handler
-self.pipeline_context = p_context
+self.stage = stage
+self.bundle_uid = self.execution_context.next_uid()
+self.num_workers = num_workers
+
+# Properties that are lazily initialized
+self._process_bundle_descriptor = None
+self._worker_handlers = None
+
+  @property
+  def worker_handlers(self):
+if self._worker_handlers is None:
+  self._worker_handlers = self.execution_context.worker_handler_manager\
+.get_worker_handlers(self.stage.environment, self.num_workers)
+return self._worker_handlers
+
+  def data_api_service_descriptor(self):
+# All worker_handlers share the same grpc server, so we can read grpc 
server
+# info from any worker_handler and read from the first worker_handler.
+return next(iter(self.worker_handlers)).data_api_service_descriptor()
+
+  def state_api_service_descriptor(self):
+# All worker_handlers share the same grpc server, so we can read grpc 
server
+# info from any worker_handler and read from the first worker_handler.
+return next(iter(self.worker_handlers)).state_api_service_descriptor()
+
+  @property
+  def process_bundle_descriptor(self):
+if self._process_bundle_descriptor is None:
+  self._process_bundle_descriptor = self._build_process_bundle_descriptor()
+return self._process_bundle_descriptor
+
+  def _build_process_bundle_descriptor(self):
+res = beam_fn_api_pb2.ProcessBundleDescriptor(
+id=self.bundle_uid,
+transforms={
+transform.unique_name: transform
+for transform in self.stage.transforms
+},
+pcollections=dict(
+self.execution_context.pipeline_components.pcollections.items()),
+  

[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411420&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411420
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:03
Start Date: 27/Mar/20 22:03
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399558218
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -415,24 +412,24 @@ def _run_bundle_multiple_times_for_testing(
 cache_token_generator=cache_token_generator)
 testing_bundle_manager.process_bundle(data_input, data_output)
   finally:
-worker_handler.state.restore()
+
runner_execution_context.worker_handler_manager.state_servicer.restore()
 
   def _collect_written_timers_and_add_to_deferred_inputs(
   self,
-  pipeline_components,  # type: beam_runner_api_pb2.Components
-  stage,  # type: translations.Stage
+  runner_execution_context,  # type: execution.FnApiRunnerExecutionContext
   bundle_context_manager,  # type: execution.BundleContextManager
   deferred_inputs,  # type: MutableMapping[str, PartitionableBuffer]
-  data_channel_coders,  # type: Mapping[str, str]
   ):
 # type: (...) -> None
 
-for transform_id, timer_writes in stage.timer_pcollections:
+for transform_id, timer_writes in \
 
 Review comment:
   backslash
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411420)
Time Spent: 1h 10m  (was: 1h)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411425&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411425
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:03
Start Date: 27/Mar/20 22:03
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399558954
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -369,8 +366,8 @@ def _store_side_inputs_in_state(self,
   state_key = beam_fn_api_pb2.StateKey(
   iterable_side_input=beam_fn_api_pb2.StateKey.IterableSideInput(
   transform_id=transform_id, side_input_id=tag, window=window))
-  bundle_context_manager.worker_handler.state.append_raw(
-  state_key, elements_data)
+  runner_execution_context.worker_handler_manager.state_servicer\
 
 Review comment:
   On second thought, perhaps the ownership of state servicer should be moved 
up to runner_execution_context (though the worker manager may need a 
reference). Your call. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411425)
Time Spent: 1h 20m  (was: 1h 10m)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=409957&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-409957
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 25/Mar/20 23:20
Start Date: 25/Mar/20 23:20
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#issuecomment-604139704
 
 
   r: @robertwb 
   I've added two classes under `execution`: 
   - `FnApiExecutionContext`, which manages runner-related context utilities 
and variables, like intermediate pcollection buffers, worker handler manager, 
safe coders and pipeline context.
   - `BundleContextManager`, which manages bundle-exeecution-related utilities 
and variables, such as the process bundle descriptor, the worker handlers for 
that stage, and the callbacks for get_buffer and get_coder_impl.
   
   This change increases their scope to manage iterable_state_write, and worker 
handlers. The goal in the future (streaming) is for `FnApiExecutionContext` to 
manage a watermark manager.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 409957)
Time Spent: 50m  (was: 40m)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=409938&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-409938
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 25/Mar/20 22:24
Start Date: 25/Mar/20 22:24
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229
 
 
   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_Va

[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=409824&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-409824
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 25/Mar/20 20:58
Start Date: 25/Mar/20 20:58
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11202: [BEAM-9608] 
Refactoring FnApiRunner to have more context managers
URL: https://github.com/apache/beam/pull/11202
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 409824)
Time Spent: 0.5h  (was: 20m)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=409729&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-409729
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 25/Mar/20 18:54
Start Date: 25/Mar/20 18:54
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11202: [BEAM-9608] 
Refactoring FnApiRunner to have more context managers
URL: https://github.com/apache/beam/pull/11202#discussion_r398094737
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -0,0 +1,337 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Set of utilities for execution of a pipeline by the FnApiRunner."""
+
+from __future__ import absolute_import
+
+import collections
+import itertools
+
+from typing_extensions import Protocol
+
+from apache_beam import coders
+from apache_beam.coders.coder_impl import create_InputStream
+from apache_beam.coders.coder_impl import create_OutputStream
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.runners.portability.fn_api_runner.translations import 
only_element
+from apache_beam.runners.portability.fn_api_runner.translations import 
split_buffer_id
+from apache_beam.runners.worker import bundle_processor
+from apache_beam.transforms import trigger
+from apache_beam.transforms.window import GlobalWindow
+from apache_beam.transforms.window import GlobalWindows
+from apache_beam.utils import windowed_value
+
+
+class Buffer(Protocol):
+  def __iter__(self):
+# type: () -> Iterator[bytes]
+pass
+
+  def append(self, item):
+# type: (bytes) -> None
+pass
+
+
+class PartitionableBuffer(Buffer, Protocol):
+  def partition(self, n):
+# type: (int) -> List[List[bytes]]
+pass
+
+
+class ListBuffer(object):
+  """Used to support parititioning of a list."""
+  def __init__(self, coder_impl):
+self._coder_impl = coder_impl
+self._inputs = []  # type: List[bytes]
+self._grouped_output = None
+self.cleared = False
+
+  def append(self, element):
+# type: (bytes) -> None
+if self.cleared:
+  raise RuntimeError('Trying to append to a cleared ListBuffer.')
+if self._grouped_output:
+  raise RuntimeError('ListBuffer append after read.')
+self._inputs.append(element)
+
+  def partition(self, n):
+# type: (int) -> List[List[bytes]]
+if self.cleared:
+  raise RuntimeError('Trying to partition a cleared ListBuffer.')
+if len(self._inputs) >= n or len(self._inputs) == 0:
+  return [self._inputs[k::n] for k in range(n)]
+else:
+  if not self._grouped_output:
+output_stream_list = [create_OutputStream() for _ in range(n)]
+idx = 0
+for input in self._inputs:
+  input_stream = create_InputStream(input)
+  while input_stream.size() > 0:
+decoded_value = self._coder_impl.decode_from_stream(
+input_stream, True)
+self._coder_impl.encode_to_stream(
+decoded_value, output_stream_list[idx], True)
+idx = (idx + 1) % n
+self._grouped_output = [[output_stream.get()]
+for output_stream in output_stream_list]
+  return self._grouped_output
+
+  def __iter__(self):
+# type: () -> Iterator[bytes]
+if self.cleared:
+  raise RuntimeError('Trying to iterate through a cleared ListBuffer.')
+return iter(self._inputs)
+
+  def clear(self):
+# type: () -> None
+self.cleared = True
+self._inputs = []
+self._grouped_output = None
+
+
+class GroupingBuffer(object):
+  """Used to accumulate groupded (shuffled) results."""
+  def __init__(self,
+   pre_grouped_coder,  # type: coders.Coder
+   post_grouped_coder,  # type: coders.Coder
+   windowing
+  ):
+# type: (...) -> None
+self._key_coder = pre_grouped_coder.key_coder()
+self._pre_grouped_coder = pre_grouped_coder
+self._post_grouped_coder = post_grouped_coder
+self._table = col

[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=409712&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-409712
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 25/Mar/20 18:36
Start Date: 25/Mar/20 18:36
Worklog Time Spent: 10m 
  Work Description: Hannah-Jiang commented on pull request #11202: 
[BEAM-9608] Refactoring FnApiRunner to have more context managers
URL: https://github.com/apache/beam/pull/11202#discussion_r398079535
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -0,0 +1,337 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Set of utilities for execution of a pipeline by the FnApiRunner."""
+
+from __future__ import absolute_import
+
+import collections
+import itertools
+
+from typing_extensions import Protocol
+
+from apache_beam import coders
+from apache_beam.coders.coder_impl import create_InputStream
+from apache_beam.coders.coder_impl import create_OutputStream
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.runners.portability.fn_api_runner.translations import 
only_element
+from apache_beam.runners.portability.fn_api_runner.translations import 
split_buffer_id
+from apache_beam.runners.worker import bundle_processor
+from apache_beam.transforms import trigger
+from apache_beam.transforms.window import GlobalWindow
+from apache_beam.transforms.window import GlobalWindows
+from apache_beam.utils import windowed_value
+
+
+class Buffer(Protocol):
+  def __iter__(self):
+# type: () -> Iterator[bytes]
+pass
+
+  def append(self, item):
+# type: (bytes) -> None
+pass
+
+
+class PartitionableBuffer(Buffer, Protocol):
+  def partition(self, n):
+# type: (int) -> List[List[bytes]]
+pass
+
+
+class ListBuffer(object):
+  """Used to support parititioning of a list."""
+  def __init__(self, coder_impl):
+self._coder_impl = coder_impl
+self._inputs = []  # type: List[bytes]
+self._grouped_output = None
+self.cleared = False
+
+  def append(self, element):
+# type: (bytes) -> None
+if self.cleared:
+  raise RuntimeError('Trying to append to a cleared ListBuffer.')
+if self._grouped_output:
+  raise RuntimeError('ListBuffer append after read.')
+self._inputs.append(element)
+
+  def partition(self, n):
+# type: (int) -> List[List[bytes]]
+if self.cleared:
+  raise RuntimeError('Trying to partition a cleared ListBuffer.')
+if len(self._inputs) >= n or len(self._inputs) == 0:
+  return [self._inputs[k::n] for k in range(n)]
+else:
+  if not self._grouped_output:
+output_stream_list = [create_OutputStream() for _ in range(n)]
+idx = 0
+for input in self._inputs:
+  input_stream = create_InputStream(input)
+  while input_stream.size() > 0:
+decoded_value = self._coder_impl.decode_from_stream(
+input_stream, True)
+self._coder_impl.encode_to_stream(
+decoded_value, output_stream_list[idx], True)
+idx = (idx + 1) % n
+self._grouped_output = [[output_stream.get()]
+for output_stream in output_stream_list]
+  return self._grouped_output
+
+  def __iter__(self):
+# type: () -> Iterator[bytes]
+if self.cleared:
+  raise RuntimeError('Trying to iterate through a cleared ListBuffer.')
+return iter(self._inputs)
+
+  def clear(self):
+# type: () -> None
+self.cleared = True
+self._inputs = []
+self._grouped_output = None
+
+
+class GroupingBuffer(object):
+  """Used to accumulate groupded (shuffled) results."""
+  def __init__(self,
+   pre_grouped_coder,  # type: coders.Coder
+   post_grouped_coder,  # type: coders.Coder
+   windowing
+  ):
+# type: (...) -> None
+self._key_coder = pre_grouped_coder.key_coder()
+self._pre_grouped_coder = pre_grouped_coder
+self._post_grouped_coder = post_grouped_coder
+self._table