[ 
https://issues.apache.org/jira/browse/BEAM-6186?focusedWorklogId=176907&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-176907
 ]

ASF GitHub Bot logged work on BEAM-6186:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Dec/18 10:30
            Start Date: 19/Dec/18 10:30
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on a change in pull request #7281: 
[BEAM-6186] Finish moving optimization phases.
URL: https://github.com/apache/beam/pull/7281#discussion_r242862511
 
 

 ##########
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
 ##########
 @@ -157,18 +163,121 @@ def deduplicate_read(self):
     self.transforms = new_transforms
 
 
+def memoize_on_instance(f):
+  missing = object()
+
+  def wrapper(self, *args):
+    try:
+      cache = getattr(self, '_cache_%s' % f.__name__)
+    except AttributeError:
+      cache = {}
+      setattr(self, '_cache_%s' % f.__name__, cache)
+    result = cache.get(args, missing)
+    if result is missing:
+      result = cache[args] = f(self, *args)
+    return result
+
+  return wrapper
+
+
 class TransformContext(object):
-  def __init__(self, components):
+
+  _KNOWN_CODER_URNS = set(
+      value.urn for value in common_urns.coders.__dict__.values())
+
+  def __init__(self, components, use_state_iterables=False):
     self.components = components
+    self.use_state_iterables = use_state_iterables
+    self.safe_coders = {}
+    self.bytes_coder_id = self.add_or_get_coder_id(
+        coders.BytesCoder().to_runner_api(None), 'bytes_coder')
 
-  def add_or_get_coder_id(self, coder_proto):
+  def add_or_get_coder_id(self, coder_proto, coder_prefix='coder'):
     for coder_id, coder in self.components.coders.items():
       if coder == coder_proto:
         return coder_id
-    new_coder_id = unique_name(self.components.coders, 'coder')
+    new_coder_id = unique_name(self.components.coders, coder_prefix)
     self.components.coders[new_coder_id].CopyFrom(coder_proto)
     return new_coder_id
 
+  @memoize_on_instance
+  def with_state_iterables(self, coder_id):
+    coder = self.components.coders[coder_id]
+    if coder.spec.spec.urn == common_urns.coders.ITERABLE.urn:
+      new_coder_id = unique_name(
+          self.components.coders, coder_id + '_state_backed')
+      new_coder = self.components.coders[new_coder_id]
+      new_coder.CopyFrom(coder)
+      new_coder.spec.spec.urn = common_urns.coders.STATE_BACKED_ITERABLE.urn
+      new_coder.spec.spec.payload = b'1'
+      new_coder.component_coder_ids[0] = self.with_state_iterables(
+          coder.component_coder_ids[0])
+      return new_coder_id
+    else:
+      new_component_ids = [
+          self.with_state_iterables(c) for c in coder.component_coder_ids]
+      if new_component_ids == coder.component_coder_ids:
+        return coder_id
+      else:
+        new_coder_id = unique_name(
+            self.components.coders, coder_id + '_state_backed')
+        self.components.coders[new_coder_id].CopyFrom(
+            beam_runner_api_pb2.Coder(
+                spec=coder.spec,
+                component_coder_ids=new_component_ids))
+        return new_coder_id
+
+  @memoize_on_instance
+  def length_prefixed_coder(self, coder_id):
+    if coder_id in self.safe_coders:
+      return coder_id
+    length_prefixed_id, safe_id = self.length_prefixed_and_safe_coder(coder_id)
+    self.safe_coders[length_prefixed_id] = safe_id
+    return length_prefixed_id
+
+  def length_prefixed_and_safe_coder(self, coder_id):
 
 Review comment:
   It's not generally called directly, but good idea for consistency. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 176907)
    Time Spent: 1h 50m  (was: 1h 40m)

> Cleanup FnApiRunner optimization phases.
> ----------------------------------------
>
>                 Key: BEAM-6186
>                 URL: https://issues.apache.org/jira/browse/BEAM-6186
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Robert Bradshaw
>            Assignee: Ahmet Altay
>            Priority: Minor
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> They are currently expressed as functions with closure. It would be good to 
> pull them out with explicit dependencies both to better be able to follow the 
> code, and also be able to test and reuse them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to