[ https://issues.apache.org/jira/browse/BEAM-4593?focusedWorklogId=121029&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-121029 ]
ASF GitHub Bot logged work on BEAM-4593: ---------------------------------------- Author: ASF GitHub Bot Created on: 09/Jul/18 21:19 Start Date: 09/Jul/18 21:19 Worklog Time Spent: 10m Work Description: charlesccychen closed pull request #5778: [BEAM-4593] Remove refcounts from the Python SDK URL: https://github.com/apache/beam/pull/5778 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 6e9fccef3b5..5a4c1dc9228 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -47,7 +47,6 @@ from __future__ import absolute_import import abc -import collections import logging import os import re @@ -529,7 +528,6 @@ def apply(self, transform, pvalueish=None, label=None): 'output type-hint was found for the ' 'PTransform %s' % ptransform_name) - current.update_input_refcounts() self.transforms_stack.pop() return pvalueish_result @@ -699,30 +697,10 @@ def __init__(self, parent, transform, full_label, inputs): self.outputs = {} self.parts = [] - # Per tag refcount dictionary for PValues for which this node is a - # root producer. - self.refcounts = collections.defaultdict(int) - def __repr__(self): return "%s(%s, %s)" % (self.__class__.__name__, self.full_label, type(self.transform).__name__) - def update_input_refcounts(self): - """Increment refcounts for all transforms providing inputs.""" - - def real_producer(pv): - real = pv.producer - while real.parts: - real = real.parts[-1] - return real - - if not self.is_composite(): - for main_input in self.inputs: - if not isinstance(main_input, pvalue.PBegin): - real_producer(main_input).refcounts[main_input.tag] += 1 - for side_input in self.side_inputs: - real_producer(side_input.pvalue).refcounts[side_input.pvalue.tag] += 1 - def replace_output(self, output, tag=None): """Replaces the output defined by the given tag with the given output. @@ -885,7 +863,6 @@ def is_side_input(tag): pc = context.pcollections.get_by_id(pcoll_id) pc.producer = result pc.tag = None if tag == 'None' else tag - result.update_input_refcounts() return result diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 5028cb89a89..098d47e880a 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -265,7 +265,6 @@ def visit_transform(self, transform_node): new_side_input.pvalue.producer = map_to_void_key map_to_void_key.add_output(new_side_input.pvalue) parent.add_part(map_to_void_key) - transform_node.update_input_refcounts() elif access_pattern == common_urns.side_inputs.MULTIMAP.urn: # Ensure the input coder is a KV coder and patch up the # access pattern to appease Dataflow. @@ -596,7 +595,6 @@ def run_ParDo(self, transform_node): # Attach side inputs. si_dict = {} - # We must call self._cache.get_pvalue exactly once due to refcounting. si_labels = {} full_label_counts = defaultdict(int) lookup_label = lambda side_pval: si_labels[side_pval] ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 121029) Time Spent: 20m (was: 10m) > Remove refcounts from the Python SDK > ------------------------------------ > > Key: BEAM-4593 > URL: https://issues.apache.org/jira/browse/BEAM-4593 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Reporter: Charles Chen > Assignee: Charles Chen > Priority: Minor > Time Spent: 20m > Remaining Estimate: 0h > > We should remove PValue refcounting logic from the Python SDK. This is > vestigial logic from the first DirectRunner implementation, and is not used > anymore. -- This message was sent by Atlassian JIRA (v7.6.3#76005)