[ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=421586&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421586
 ]

ASF GitHub Bot logged work on BEAM-9639:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Apr/20 19:38
            Start Date: 13/Apr/20 19:38
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on pull request #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#discussion_r407666877
 
 

 ##########
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
 ##########
 @@ -149,15 +155,26 @@ def no_overlap(a, b):
     return (
         not consumer.forced_root and not self in consumer.must_follow and
         not self.is_runner_urn(context) and
-        not consumer.is_runner_urn(context) and
-        no_overlap(self.downstream_side_inputs, consumer.side_inputs()))
+        not consumer.is_runner_urn(context) and no_overlap(
+            set(self.downstream_side_inputs.keys()),
+            {i
+             for i, _, _ in consumer.side_inputs()}))
+
+  def _fuse_downstream_side_inputs(self, other):
+    res = dict(self.downstream_side_inputs)
+    for si, other_si_ids in other.downstream_side_inputs.items():
+      if si in res:
+        res[si] = union(res[si], other_si_ids)
+      else:
+        res[si] = other_si_ids
+    return res
 
   def fuse(self, other):
     # type: (Stage) -> Stage
     return Stage(
         "(%s)+(%s)" % (self.name, other.name),
         self.transforms + other.transforms,
-        union(self.downstream_side_inputs, other.downstream_side_inputs),
+        self._fuse_downstream_side_inputs(other),
 
 Review comment:
   Nit: this sounds like it mutates self.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 421586)
    Time Spent: 2h 10m  (was: 2h)

> Abstract bundle execution logic from stage execution logic
> ----------------------------------------------------------
>
>                 Key: BEAM-9639
>                 URL: https://issues.apache.org/jira/browse/BEAM-9639
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Pablo Estrada
>            Assignee: Pablo Estrada
>            Priority: Major
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to