[jira] [Work logged] (BEAM-5521) Cache execution trees in SDK worker
[ https://issues.apache.org/jira/browse/BEAM-5521?focusedWorklogId=155803&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-155803 ] ASF GitHub Bot logged work on BEAM-5521: Author: ASF GitHub Bot Created on: 18/Oct/18 09:45 Start Date: 18/Oct/18 09:45 Worklog Time Spent: 10m Work Description: robertwb closed pull request #6717: [BEAM-5521] Re-use bundle processors across bundles. URL: https://github.com/apache/beam/pull/6717 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py index b177d2014d7..1836a82eb0c 100644 --- a/sdks/python/apache_beam/metrics/cells.py +++ b/sdks/python/apache_beam/metrics/cells.py @@ -147,6 +147,10 @@ def __init__(self, *args): super(CounterCell, self).__init__(*args) self.value = CounterAggregator.identity_element() + def reset(self): +self.commit = CellCommitState() +self.value = CounterAggregator.identity_element() + def combine(self, other): result = CounterCell() result.inc(self.value + other.value) @@ -189,6 +193,10 @@ def __init__(self, *args): super(DistributionCell, self).__init__(*args) self.data = DistributionAggregator.identity_element() + def reset(self): +self.commit = CellCommitState() +self.data = DistributionAggregator.identity_element() + def combine(self, other): result = DistributionCell() result.data = self.data.combine(other.data) @@ -230,6 +238,10 @@ def __init__(self, *args): super(GaugeCell, self).__init__(*args) self.data = GaugeAggregator.identity_element() + def reset(self): +self.commit = CellCommitState() +self.data = GaugeAggregator.identity_element() + def combine(self, other): result = GaugeCell() result.data = self.data.combine(other.data) diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py index 2d771394c23..5818ac2efa8 100644 --- a/sdks/python/apache_beam/metrics/execution.py +++ b/sdks/python/apache_beam/metrics/execution.py @@ -238,6 +238,14 @@ def to_runner_api_monitoring_infos(self, transform_id): )) return {monitoring_infos.to_key(mi) : mi for mi in all_user_metrics} + def reset(self): +for counter in self.counters.values(): + counter.reset() +for distribution in self.distributions.values(): + distribution.reset() +for gauge in self.gauges.values(): + gauge.reset() + class MetricUpdates(object): """Contains updates for several metrics. diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index e4737b4ad09..4fd0aace539 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -151,7 +151,6 @@ def __init__(self, state_handler, transform_id, tag, side_input_data, coder): self._element_coder = coder.wrapped_value_coder self._target_window_coder = coder.window_coder # TODO(robertwb): Limit the cache size. -# TODO(robertwb): Cross-bundle caching respecting cache tokens. self._cache = {} def __getitem__(self, window): @@ -205,6 +204,10 @@ def is_globally_windowed(self): return (self._side_input_data.window_mapping_fn == sideinputs._global_window_mapping_fn) + def reset(self): +# TODO(BEAM-5428): Cross-bundle caching respecting cache tokens. +self._cache = {} + class CombiningValueRuntimeState(userstate.RuntimeState): def __init__(self, underlying_bag_state, combinefn): @@ -310,6 +313,10 @@ def get_state(self, state_spec, key, window): else: raise NotImplementedError(state_spec) + def reset(self): +# TODO(BEAM-5428): Implement cross-bundle state caching. +pass + def memoize(func): cache = {} @@ -342,6 +349,8 @@ def __init__( 'fnapi-step-%s' % self.process_bundle_descriptor.id, self.counter_factory) self.ops = self.create_execution_tree(self.process_bundle_descriptor) +for op in self.ops.values(): + op.setup() def create_execution_tree(self, descriptor): @@ -385,6 +394,13 @@ def topological_height(transform_id): for transform_id in sorted( descriptor.transforms, key=topological_height, reverse=True)]) + def reset(self): +self.counter_factory.reset() +self.state_sampler.reset() +# Side input caches. +for op in self.ops.values(): + op.reset() + def process_bundle(self, instruction_id): expected_inputs = [] for op in self.ops.values(): diff --git a/sdks/pyth
[jira] [Work logged] (BEAM-5521) Cache execution trees in SDK worker
[ https://issues.apache.org/jira/browse/BEAM-5521?focusedWorklogId=155685&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-155685 ] ASF GitHub Bot logged work on BEAM-5521: Author: ASF GitHub Bot Created on: 18/Oct/18 03:28 Start Date: 18/Oct/18 03:28 Worklog Time Spent: 10m Work Description: tweise commented on issue #6717: [BEAM-5521] Re-use bundle processors across bundles. URL: https://github.com/apache/beam/pull/6717#issuecomment-430863234 I see CPU reduction by 50% when running locally with parallelism 1. The throughput increases are much more visible when combined with https://github.com/apache/beam/pull/6723. Will take it to the cluster with both changes, the gains will be visible more clearly when running with high parallelism. @mwylde fyi This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 155685) Time Spent: 0.5h (was: 20m) > Cache execution trees in SDK worker > --- > > Key: BEAM-5521 > URL: https://issues.apache.org/jira/browse/BEAM-5521 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Robert Bradshaw >Priority: Major > Labels: portability-flink > Time Spent: 0.5h > Remaining Estimate: 0h > > Currently they are re-constructed from the protos for every bundle, which is > expensive (especially for 1-element bundles in streaming flink). > Care should be taken to ensure the objects can be re-usued. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5521) Cache execution trees in SDK worker
[ https://issues.apache.org/jira/browse/BEAM-5521?focusedWorklogId=155338&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-155338 ] ASF GitHub Bot logged work on BEAM-5521: Author: ASF GitHub Bot Created on: 17/Oct/18 10:30 Start Date: 17/Oct/18 10:30 Worklog Time Spent: 10m Work Description: robertwb commented on issue #6717: [BEAM-5521] Re-use bundle processors across bundles. URL: https://github.com/apache/beam/pull/6717#issuecomment-430576644 R: @tweise This should improve throughput substantially. There is other per-bundle overhead (e.g. final counters and progress reports) that may need to be looked at as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 155338) Time Spent: 20m (was: 10m) > Cache execution trees in SDK worker > --- > > Key: BEAM-5521 > URL: https://issues.apache.org/jira/browse/BEAM-5521 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Robert Bradshaw >Priority: Major > Labels: portability-flink > Time Spent: 20m > Remaining Estimate: 0h > > Currently they are re-constructed from the protos for every bundle, which is > expensive (especially for 1-element bundles in streaming flink). > Care should be taken to ensure the objects can be re-usued. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5521) Cache execution trees in SDK worker
[ https://issues.apache.org/jira/browse/BEAM-5521?focusedWorklogId=155331&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-155331 ] ASF GitHub Bot logged work on BEAM-5521: Author: ASF GitHub Bot Created on: 17/Oct/18 10:14 Start Date: 17/Oct/18 10:14 Worklog Time Spent: 10m Work Description: robertwb opened a new pull request #6717: [BEAM-5521] Re-use bundle processors across bundles. URL: https://github.com/apache/beam/pull/6717 Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 155331) Time Spent: 10m Remaining Estimate: 0h > Cache execution trees in SDK worker > --- > > Key: BEAM-5521 > URL: https://issues.apache.org/jira/browse/BEAM-5521 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Robert Bradshaw >Priority: Ma