[jira] [Work logged] (BEAM-5521) Cache execution trees in SDK worker

2018-10-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5521?focusedWorklogId=155803&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-155803
 ]

ASF GitHub Bot logged work on BEAM-5521:


Author: ASF GitHub Bot
Created on: 18/Oct/18 09:45
Start Date: 18/Oct/18 09:45
Worklog Time Spent: 10m 
  Work Description: robertwb closed pull request #6717: [BEAM-5521] Re-use 
bundle processors across bundles.
URL: https://github.com/apache/beam/pull/6717
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/metrics/cells.py 
b/sdks/python/apache_beam/metrics/cells.py
index b177d2014d7..1836a82eb0c 100644
--- a/sdks/python/apache_beam/metrics/cells.py
+++ b/sdks/python/apache_beam/metrics/cells.py
@@ -147,6 +147,10 @@ def __init__(self, *args):
 super(CounterCell, self).__init__(*args)
 self.value = CounterAggregator.identity_element()
 
+  def reset(self):
+self.commit = CellCommitState()
+self.value = CounterAggregator.identity_element()
+
   def combine(self, other):
 result = CounterCell()
 result.inc(self.value + other.value)
@@ -189,6 +193,10 @@ def __init__(self, *args):
 super(DistributionCell, self).__init__(*args)
 self.data = DistributionAggregator.identity_element()
 
+  def reset(self):
+self.commit = CellCommitState()
+self.data = DistributionAggregator.identity_element()
+
   def combine(self, other):
 result = DistributionCell()
 result.data = self.data.combine(other.data)
@@ -230,6 +238,10 @@ def __init__(self, *args):
 super(GaugeCell, self).__init__(*args)
 self.data = GaugeAggregator.identity_element()
 
+  def reset(self):
+self.commit = CellCommitState()
+self.data = GaugeAggregator.identity_element()
+
   def combine(self, other):
 result = GaugeCell()
 result.data = self.data.combine(other.data)
diff --git a/sdks/python/apache_beam/metrics/execution.py 
b/sdks/python/apache_beam/metrics/execution.py
index 2d771394c23..5818ac2efa8 100644
--- a/sdks/python/apache_beam/metrics/execution.py
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -238,6 +238,14 @@ def to_runner_api_monitoring_infos(self, transform_id):
   ))
 return {monitoring_infos.to_key(mi) : mi for mi in all_user_metrics}
 
+  def reset(self):
+for counter in self.counters.values():
+  counter.reset()
+for distribution in self.distributions.values():
+  distribution.reset()
+for gauge in self.gauges.values():
+  gauge.reset()
+
 
 class MetricUpdates(object):
   """Contains updates for several metrics.
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index e4737b4ad09..4fd0aace539 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -151,7 +151,6 @@ def __init__(self, state_handler, transform_id, tag, 
side_input_data, coder):
 self._element_coder = coder.wrapped_value_coder
 self._target_window_coder = coder.window_coder
 # TODO(robertwb): Limit the cache size.
-# TODO(robertwb): Cross-bundle caching respecting cache tokens.
 self._cache = {}
 
   def __getitem__(self, window):
@@ -205,6 +204,10 @@ def is_globally_windowed(self):
 return (self._side_input_data.window_mapping_fn
 == sideinputs._global_window_mapping_fn)
 
+  def reset(self):
+# TODO(BEAM-5428): Cross-bundle caching respecting cache tokens.
+self._cache = {}
+
 
 class CombiningValueRuntimeState(userstate.RuntimeState):
   def __init__(self, underlying_bag_state, combinefn):
@@ -310,6 +313,10 @@ def get_state(self, state_spec, key, window):
 else:
   raise NotImplementedError(state_spec)
 
+  def reset(self):
+# TODO(BEAM-5428): Implement cross-bundle state caching.
+pass
+
 
 def memoize(func):
   cache = {}
@@ -342,6 +349,8 @@ def __init__(
 'fnapi-step-%s' % self.process_bundle_descriptor.id,
 self.counter_factory)
 self.ops = self.create_execution_tree(self.process_bundle_descriptor)
+for op in self.ops.values():
+  op.setup()
 
   def create_execution_tree(self, descriptor):
 
@@ -385,6 +394,13 @@ def topological_height(transform_id):
 for transform_id in sorted(
 descriptor.transforms, key=topological_height, reverse=True)])
 
+  def reset(self):
+self.counter_factory.reset()
+self.state_sampler.reset()
+# Side input caches.
+for op in self.ops.values():
+  op.reset()
+
   def process_bundle(self, instruction_id):
 expected_inputs = []
 for op in self.ops.values():
diff --git a/sdks/pyth

[jira] [Work logged] (BEAM-5521) Cache execution trees in SDK worker

2018-10-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5521?focusedWorklogId=155685&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-155685
 ]

ASF GitHub Bot logged work on BEAM-5521:


Author: ASF GitHub Bot
Created on: 18/Oct/18 03:28
Start Date: 18/Oct/18 03:28
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6717: [BEAM-5521] Re-use 
bundle processors across bundles.
URL: https://github.com/apache/beam/pull/6717#issuecomment-430863234
 
 
   I see CPU reduction by 50% when running locally with parallelism 1. The 
throughput increases are much more visible when combined with 
https://github.com/apache/beam/pull/6723. Will take it to the cluster with both 
changes, the gains will be visible more clearly when running with high 
parallelism. @mwylde fyi  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 155685)
Time Spent: 0.5h  (was: 20m)

> Cache execution trees in SDK worker
> ---
>
> Key: BEAM-5521
> URL: https://issues.apache.org/jira/browse/BEAM-5521
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Labels: portability-flink
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Currently they are re-constructed from the protos for every bundle, which is 
> expensive (especially for 1-element bundles in streaming flink). 
> Care should be taken to ensure the objects can be re-usued. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5521) Cache execution trees in SDK worker

2018-10-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5521?focusedWorklogId=155338&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-155338
 ]

ASF GitHub Bot logged work on BEAM-5521:


Author: ASF GitHub Bot
Created on: 17/Oct/18 10:30
Start Date: 17/Oct/18 10:30
Worklog Time Spent: 10m 
  Work Description: robertwb commented on issue #6717: [BEAM-5521] Re-use 
bundle processors across bundles.
URL: https://github.com/apache/beam/pull/6717#issuecomment-430576644
 
 
   R: @tweise 
   
   This should improve throughput substantially. There is other per-bundle 
overhead (e.g. final counters and progress reports) that may need to be looked 
at as well. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 155338)
Time Spent: 20m  (was: 10m)

> Cache execution trees in SDK worker
> ---
>
> Key: BEAM-5521
> URL: https://issues.apache.org/jira/browse/BEAM-5521
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Labels: portability-flink
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently they are re-constructed from the protos for every bundle, which is 
> expensive (especially for 1-element bundles in streaming flink). 
> Care should be taken to ensure the objects can be re-usued. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5521) Cache execution trees in SDK worker

2018-10-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5521?focusedWorklogId=155331&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-155331
 ]

ASF GitHub Bot logged work on BEAM-5521:


Author: ASF GitHub Bot
Created on: 17/Oct/18 10:14
Start Date: 17/Oct/18 10:14
Worklog Time Spent: 10m 
  Work Description: robertwb opened a new pull request #6717: [BEAM-5521] 
Re-use bundle processors across bundles.
URL: https://github.com/apache/beam/pull/6717
 
 
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
 | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 155331)
Time Spent: 10m
Remaining Estimate: 0h

> Cache execution trees in SDK worker
> ---
>
> Key: BEAM-5521
> URL: https://issues.apache.org/jira/browse/BEAM-5521
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Ma