[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-05-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=432228=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-432228
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 08/May/20 19:46
Start Date: 08/May/20 19:46
Worklog Time Spent: 10m 
  Work Description: ibzib commented on a change in pull request #11270:
URL: https://github.com/apache/beam/pull/11270#discussion_r422338193



##
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
##
@@ -240,6 +240,30 @@ def test_multimap_side_input(self):
   lambda k, d: (k, sorted(d[k])), beam.pvalue.AsMultiMap(side)),
   equal_to([('a', [1, 3]), ('b', [2])]))
 
+  def test_multimap_multiside_input(self):

Review comment:
   Thanks for reporting Boyuan, this was a flaw with the Spark runner. Fix: 
#11644





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 432228)
Time Spent: 6h 10m  (was: 6h)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=429180=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-429180
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 30/Apr/20 18:20
Start Date: 30/Apr/20 18:20
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on a change in pull request #11270:
URL: https://github.com/apache/beam/pull/11270#discussion_r418202864



##
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
##
@@ -240,6 +240,30 @@ def test_multimap_side_input(self):
   lambda k, d: (k, sorted(d[k])), beam.pvalue.AsMultiMap(side)),
   equal_to([('a', [1, 3]), ('b', [2])]))
 
+  def test_multimap_multiside_input(self):

Review comment:
   This test breaks Spark VR test: 
https://issues.apache.org/jira/browse/BEAM-9862. Please either support the same 
function for Spark or sickbay it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 429180)
Time Spent: 6h  (was: 5h 50m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=427949=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-427949
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 28/Apr/20 03:29
Start Date: 28/Apr/20 03:29
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11544:
URL: https://github.com/apache/beam/pull/11544#issuecomment-620357951


   wheew : )



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 427949)
Time Spent: 5h 50m  (was: 5h 40m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=427916=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-427916
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 28/Apr/20 01:36
Start Date: 28/Apr/20 01:36
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11544:
URL: https://github.com/apache/beam/pull/11544#issuecomment-620325111


   The test seems to be failing anyway
   @chamikaramj 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 427916)
Time Spent: 5h 40m  (was: 5.5h)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=427903=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-427903
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 28/Apr/20 00:28
Start Date: 28/Apr/20 00:28
Worklog Time Spent: 10m 
  Work Description: pabloem opened a new pull request #11544:
URL: https://github.com/apache/beam/pull/11544


   Reverting this PR as PreCommits are very flaky
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 

[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=425851=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-425851
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 21/Apr/20 19:22
Start Date: 21/Apr/20 19:22
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #11270:
URL: https://github.com/apache/beam/pull/11270#issuecomment-617363729


   Run Python2_PVR_Flink PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 425851)
Time Spent: 5h 20m  (was: 5h 10m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=425817=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-425817
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 21/Apr/20 18:15
Start Date: 21/Apr/20 18:15
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #11270:
URL: https://github.com/apache/beam/pull/11270#discussion_r412384268



##
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##
@@ -75,21 +75,27 @@
 
 IMPULSE_BUFFER = b'impulse'
 
+# SideInputId is identified by a consumer ParDo + tag.
+SideInputId = Tuple[str, str]
+
+DataSideInput = Dict[SideInputId,
+ Tuple[bytes, beam_runner_api_pb2.FunctionSpec]]
+
 
 class Stage(object):
   """A set of Transforms that can be sent to the worker for processing."""
   def __init__(self,
name,  # type: str
transforms,  # type: List[beam_runner_api_pb2.PTransform]
-   downstream_side_inputs=None,  # type: Optional[FrozenSet[str]]
+   downstream_side_inputs=None,  # type: Optional[Dict[str, 
SideInputId]]

Review comment:
   Discussed offline, but capturing here for the record. These sets contain 
the transitive collection of everything downstream of any side-input consuming 
transform, and as such can be large even if the total number of side inputs is 
small. (The number of distinct such sets is about the same as the number of 
side inputs, so we keep the total memory use down by re-using them--to give 
each transform its own copy would easily be O(n^2).)
   
   Your change of computing the side input mapping after the graph has been 
fused is good (and arguably better, as you only need the immediate consumers, 
and don't have to re-compute each time a stage is fused).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 425817)
Time Spent: 5h 10m  (was: 5h)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=424777=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-424777
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 18/Apr/20 06:54
Start Date: 18/Apr/20 06:54
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#issuecomment-615629278
 
 
   Run Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 424777)
Time Spent: 5h  (was: 4h 50m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=424776=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-424776
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 18/Apr/20 06:54
Start Date: 18/Apr/20 06:54
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#issuecomment-615628912
 
 
   Run Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 424776)
Time Spent: 4h 50m  (was: 4h 40m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=424535=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-424535
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 18/Apr/20 00:32
Start Date: 18/Apr/20 00:32
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#issuecomment-615521653
 
 
   Building the side input index elsewhere. LMK what you think.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 424535)
Time Spent: 4h 40m  (was: 4.5h)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=423061=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-423061
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 15/Apr/20 20:56
Start Date: 15/Apr/20 20:56
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#issuecomment-614275628
 
 
   Run Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 423061)
Time Spent: 4.5h  (was: 4h 20m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=423050=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-423050
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 15/Apr/20 20:43
Start Date: 15/Apr/20 20:43
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#issuecomment-614269899
 
 
   Run Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 423050)
Time Spent: 4h 20m  (was: 4h 10m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=423046=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-423046
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 15/Apr/20 20:40
Start Date: 15/Apr/20 20:40
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#issuecomment-614268696
 
 
   failed test is streaming wordcount test
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 423046)
Time Spent: 4h 10m  (was: 4h)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=423002=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-423002
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 15/Apr/20 19:13
Start Date: 15/Apr/20 19:13
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#issuecomment-614228294
 
 
   Run Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 423002)
Time Spent: 4h  (was: 3h 50m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=422913=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-422913
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 15/Apr/20 17:48
Start Date: 15/Apr/20 17:48
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#issuecomment-614184800
 
 
   Run Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 422913)
Time Spent: 3h 50m  (was: 3h 40m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=422451=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-422451
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 15/Apr/20 00:17
Start Date: 15/Apr/20 00:17
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#discussion_r408508964
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
 ##
 @@ -75,21 +75,27 @@
 
 IMPULSE_BUFFER = b'impulse'
 
+# SideInputId is identified by a consumer ParDo + tag.
+SideInputId = Tuple[str, str]
+
+DataSideInput = Dict[SideInputId,
+ Tuple[bytes, beam_runner_api_pb2.FunctionSpec]]
+
 
 class Stage(object):
   """A set of Transforms that can be sent to the worker for processing."""
   def __init__(self,
name,  # type: str
transforms,  # type: List[beam_runner_api_pb2.PTransform]
-   downstream_side_inputs=None,  # type: Optional[FrozenSet[str]]
+   downstream_side_inputs=None,  # type: Optional[Dict[str, 
SideInputId]]
 
 Review comment:
   Hm so this change breaks that, so the memory requirements would be larger. I 
would think that they would not be too bad, since most graphs don't have many 
side inputs going many places. What do you think? I'm willing to find a better 
solution for this, but I wonder if it's worth the extra time.
   
   The reason that this is made into a dict is to contain more information 
about downstream side inputs. specifically, it contains which transforms will 
consume the side inputs. this is used to commit the side inputs to state after 
they are calculated (rather than before they are consumed). This will be 
necessary for streaming, because side inputs will need to be added to state as 
they are computed.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 422451)
Time Spent: 3h 40m  (was: 3.5h)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=422442=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-422442
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 15/Apr/20 00:13
Start Date: 15/Apr/20 00:13
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#discussion_r408507669
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
 ##
 @@ -149,15 +155,26 @@ def no_overlap(a, b):
 return (
 not consumer.forced_root and not self in consumer.must_follow and
 not self.is_runner_urn(context) and
-not consumer.is_runner_urn(context) and
-no_overlap(self.downstream_side_inputs, consumer.side_inputs()))
+not consumer.is_runner_urn(context) and no_overlap(
+set(self.downstream_side_inputs.keys()),
+{i
+ for i, _, _ in consumer.side_inputs()}))
+
+  def _fuse_downstream_side_inputs(self, other):
+res = dict(self.downstream_side_inputs)
+for si, other_si_ids in other.downstream_side_inputs.items():
+  if si in res:
+res[si] = union(res[si], other_si_ids)
+  else:
+res[si] = other_si_ids
+return res
 
   def fuse(self, other):
 # type: (Stage) -> Stage
 return Stage(
 "(%s)+(%s)" % (self.name, other.name),
 self.transforms + other.transforms,
-union(self.downstream_side_inputs, other.downstream_side_inputs),
+self._fuse_downstream_side_inputs(other),
 
 Review comment:
   renamed to `_get_fused_downstream_side_inputs` thoughts?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 422442)
Time Spent: 2h 40m  (was: 2.5h)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=422447=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-422447
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 15/Apr/20 00:13
Start Date: 15/Apr/20 00:13
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#discussion_r408507914
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -914,14 +898,16 @@ def process_bundle(self,
  expected_outputs,  # type: DataOutput
  fired_timers,  # type: Mapping[Tuple[str, str], 
PartitionableBuffer]
  expected_output_timers  # type: Dict[str, Dict[str, str]]
+ dry_run=False
 
 Review comment:
   Done
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 422447)
Time Spent: 3.5h  (was: 3h 20m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=422444=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-422444
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 15/Apr/20 00:13
Start Date: 15/Apr/20 00:13
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#discussion_r408507730
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -367,6 +413,73 @@ def _build_process_bundle_descriptor(self):
 state_api_service_descriptor=self.state_api_service_descriptor(),
 timer_api_service_descriptor=self.data_api_service_descriptor())
 
+  def commit_output_views_to_state(self):
+"""Commit bundle outputs to state to be consumed as side inputs later.
+
+Only the outputs that should be side inputs are committed to state.
+"""
+data_side_input = {}  # type: DataSideInput
+for pcoll, si_ids in self.stage.downstream_side_inputs.items():
+  for (consumer_transform_name, tag), access_pattern in si_ids.items():
+data_side_input[consumer_transform_name, tag] = (
+translations.create_buffer_id(pcoll), access_pattern)
+self.execution_context.commit_side_inputs_to_state(data_side_input)
+
+  def extract_bundle_inputs(self):
+# type: (...) -> Tuple[Dict[str, PartitionableBuffer], DataOutput]
+
+"""Returns maps of transform names to PCollection identifiers.
+
+Also mutates IO stages to point to the data ApiServiceDescriptor.
+
+Returns:
+  A tuple of (data_input, data_output) dictionaries.
+`data_input` is a dictionary mapping (transform_name, output_name) to a
+PCollection buffer; `data_output` is a dictionary mapping
+(transform_name, output_name) to a PCollection ID.
+"""
+data_input = {}  # type: Dict[str, PartitionableBuffer]
+data_output = {}  # type: DataOutput
+# A mapping of {(transform_id, timer_family_id) : buffer_id}
+expected_timer_output = {}  # type: Dict[Tuple(str, str), str]
+for transform in self.stage.transforms:
+  if transform.spec.urn in (bundle_processor.DATA_INPUT_URN,
+bundle_processor.DATA_OUTPUT_URN):
+pcoll_id = transform.spec.payload
+if transform.spec.urn == bundle_processor.DATA_INPUT_URN:
+  coder_id = self.execution_context.data_channel_coders[only_element(
+  transform.outputs.values())]
+  coder = self.execution_context.pipeline_context.coders[
+  self.execution_context.safe_coders.get(coder_id, coder_id)]
+  if pcoll_id == translations.IMPULSE_BUFFER:
+data_input[transform.unique_name] = ListBuffer(
+coder_impl=coder.get_impl())
+data_input[transform.unique_name].append(ENCODED_IMPULSE_VALUE)
+  else:
+if pcoll_id not in self.execution_context.pcoll_buffers:
+  self.execution_context.pcoll_buffers[pcoll_id] = ListBuffer(
+  coder_impl=coder.get_impl())
+data_input[transform.unique_name] = \
+  self.execution_context.pcoll_buffers[pcoll_id]
+elif transform.spec.urn == bundle_processor.DATA_OUTPUT_URN:
+  data_output[transform.unique_name] = pcoll_id
+  coder_id = self.execution_context.data_channel_coders[only_element(
+  transform.inputs.values())]
+else:
+  raise NotImplementedError
+data_spec = beam_fn_api_pb2.RemoteGrpcPort(coder_id=coder_id)
+data_api_service_descriptor = \
+  self.data_api_service_descriptor()
+if data_api_service_descriptor:
+  data_spec.api_service_descriptor.url = (
+  data_api_service_descriptor.url)
+transform.spec.payload = data_spec.SerializeToString()
+  elif transform.spec.urn in translations.PAR_DO_URNS:
+for timer_family_id in payload.timer_family_specs.keys():
+  expected_timer_output[(transform.unique_name, timer_family_id)] = (
+  create_buffer_id(timer_family_id, 'timers'))
+return data_input, data_output, expected_timer_output
 
 Review comment:
   done
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 422444)
Time Spent: 3h  (was: 2h 50m)

> Abstract bundle execution logic from stage execution logic
> --
>
>

[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=422445=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-422445
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 15/Apr/20 00:13
Start Date: 15/Apr/20 00:13
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#discussion_r408507780
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -367,6 +413,73 @@ def _build_process_bundle_descriptor(self):
 state_api_service_descriptor=self.state_api_service_descriptor(),
 timer_api_service_descriptor=self.data_api_service_descriptor())
 
+  def commit_output_views_to_state(self):
+"""Commit bundle outputs to state to be consumed as side inputs later.
+
+Only the outputs that should be side inputs are committed to state.
+"""
+data_side_input = {}  # type: DataSideInput
+for pcoll, si_ids in self.stage.downstream_side_inputs.items():
+  for (consumer_transform_name, tag), access_pattern in si_ids.items():
+data_side_input[consumer_transform_name, tag] = (
+translations.create_buffer_id(pcoll), access_pattern)
+self.execution_context.commit_side_inputs_to_state(data_side_input)
+
+  def extract_bundle_inputs(self):
 
 Review comment:
   done
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 422445)
Time Spent: 3h 10m  (was: 3h)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=422446=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-422446
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 15/Apr/20 00:13
Start Date: 15/Apr/20 00:13
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#discussion_r408507854
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -326,8 +327,8 @@ def commit_side_inputs_to_state(
   data_side_input,  # type: DataSideInput
   ):
 # type: (...) -> None
-for (consuming_transform_id, tag), (buffer_id, func_spec) \
-in data_side_input.items():
+for (consuming_transform_id, tag), (buffer_id,
 
 Review comment:
   it did not : ( hehe
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 422446)
Time Spent: 3h 20m  (was: 3h 10m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=422443=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-422443
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 15/Apr/20 00:13
Start Date: 15/Apr/20 00:13
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#discussion_r408507693
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -367,6 +413,73 @@ def _build_process_bundle_descriptor(self):
 state_api_service_descriptor=self.state_api_service_descriptor(),
 timer_api_service_descriptor=self.data_api_service_descriptor())
 
+  def commit_output_views_to_state(self):
 
 Review comment:
   done
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 422443)
Time Spent: 2h 50m  (was: 2h 40m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=422441=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-422441
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 15/Apr/20 00:12
Start Date: 15/Apr/20 00:12
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#discussion_r408507610
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
 ##
 @@ -75,21 +75,27 @@
 
 IMPULSE_BUFFER = b'impulse'
 
+# SideInputId is identified by a consumer ParDo + tag.
+SideInputId = Tuple[str, str]
+
+DataSideInput = Dict[SideInputId,
 
 Review comment:
   The value is a tuple with the encoded data. I've moved these to 
translations.py, and updated the comment
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 422441)
Time Spent: 2.5h  (was: 2h 20m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=422440=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-422440
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 15/Apr/20 00:12
Start Date: 15/Apr/20 00:12
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#discussion_r408507558
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
 ##
 @@ -149,15 +155,26 @@ def no_overlap(a, b):
 return (
 not consumer.forced_root and not self in consumer.must_follow and
 not self.is_runner_urn(context) and
-not consumer.is_runner_urn(context) and
-no_overlap(self.downstream_side_inputs, consumer.side_inputs()))
+not consumer.is_runner_urn(context) and no_overlap(
+set(self.downstream_side_inputs.keys()),
+{i
+ for i, _, _ in consumer.side_inputs()}))
+
+  def _fuse_downstream_side_inputs(self, other):
+res = dict(self.downstream_side_inputs)
+for si, other_si_ids in other.downstream_side_inputs.items():
+  if si in res:
+res[si] = union(res[si], other_si_ids)
 
 Review comment:
   woah this is a bug.
   downstream side input is a dictionary mapping to dictionaries.
   
   Dict[Output Pcollection, Dict[Side input ID, Access pattern]]
   Where Side input ID is Tuple[consumer ptransform, input index]. Added 
appropriate annotations, and fixed the bug.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 422440)
Time Spent: 2h 20m  (was: 2h 10m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=421587=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421587
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 13/Apr/20 19:38
Start Date: 13/Apr/20 19:38
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#discussion_r407670612
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
 ##
 @@ -75,21 +75,27 @@
 
 IMPULSE_BUFFER = b'impulse'
 
+# SideInputId is identified by a consumer ParDo + tag.
+SideInputId = Tuple[str, str]
+
+DataSideInput = Dict[SideInputId,
+ Tuple[bytes, beam_runner_api_pb2.FunctionSpec]]
+
 
 class Stage(object):
   """A set of Transforms that can be sent to the worker for processing."""
   def __init__(self,
name,  # type: str
transforms,  # type: List[beam_runner_api_pb2.PTransform]
-   downstream_side_inputs=None,  # type: Optional[FrozenSet[str]]
+   downstream_side_inputs=None,  # type: Optional[Dict[str, 
SideInputId]]
 
 Review comment:
   The goal of this (which, yes, should have been better documented) is to 
quickly be able to prohibit fusion. But the reason we defined our own union was 
so that memory didn't grow as O(n^2) in the common case because many stages 
were able to share this set (rather than have their own copy). These changes 
seem to break that. 
   
   Also, could you clarify why this was made into a dict? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421587)
Time Spent: 2h 10m  (was: 2h)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=421584=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421584
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 13/Apr/20 19:38
Start Date: 13/Apr/20 19:38
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#discussion_r407673895
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -914,14 +898,16 @@ def process_bundle(self,
  expected_outputs,  # type: DataOutput
  fired_timers,  # type: Mapping[Tuple[str, str], 
PartitionableBuffer]
  expected_output_timers  # type: Dict[str, Dict[str, str]]
+ dry_run=False
 
 Review comment:
   This should be the default, we shouldn't have to pass it.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421584)
Time Spent: 2h  (was: 1h 50m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=421582=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421582
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 13/Apr/20 19:38
Start Date: 13/Apr/20 19:38
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#discussion_r407663916
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -367,6 +413,73 @@ def _build_process_bundle_descriptor(self):
 state_api_service_descriptor=self.state_api_service_descriptor(),
 timer_api_service_descriptor=self.data_api_service_descriptor())
 
+  def commit_output_views_to_state(self):
+"""Commit bundle outputs to state to be consumed as side inputs later.
+
+Only the outputs that should be side inputs are committed to state.
+"""
+data_side_input = {}  # type: DataSideInput
+for pcoll, si_ids in self.stage.downstream_side_inputs.items():
+  for (consumer_transform_name, tag), access_pattern in si_ids.items():
+data_side_input[consumer_transform_name, tag] = (
+translations.create_buffer_id(pcoll), access_pattern)
+self.execution_context.commit_side_inputs_to_state(data_side_input)
+
+  def extract_bundle_inputs(self):
+# type: (...) -> Tuple[Dict[str, PartitionableBuffer], DataOutput]
+
+"""Returns maps of transform names to PCollection identifiers.
+
+Also mutates IO stages to point to the data ApiServiceDescriptor.
+
+Returns:
+  A tuple of (data_input, data_output) dictionaries.
+`data_input` is a dictionary mapping (transform_name, output_name) to a
+PCollection buffer; `data_output` is a dictionary mapping
+(transform_name, output_name) to a PCollection ID.
+"""
+data_input = {}  # type: Dict[str, PartitionableBuffer]
+data_output = {}  # type: DataOutput
+# A mapping of {(transform_id, timer_family_id) : buffer_id}
+expected_timer_output = {}  # type: Dict[Tuple(str, str), str]
+for transform in self.stage.transforms:
+  if transform.spec.urn in (bundle_processor.DATA_INPUT_URN,
+bundle_processor.DATA_OUTPUT_URN):
+pcoll_id = transform.spec.payload
+if transform.spec.urn == bundle_processor.DATA_INPUT_URN:
+  coder_id = self.execution_context.data_channel_coders[only_element(
+  transform.outputs.values())]
+  coder = self.execution_context.pipeline_context.coders[
+  self.execution_context.safe_coders.get(coder_id, coder_id)]
+  if pcoll_id == translations.IMPULSE_BUFFER:
+data_input[transform.unique_name] = ListBuffer(
+coder_impl=coder.get_impl())
+data_input[transform.unique_name].append(ENCODED_IMPULSE_VALUE)
+  else:
+if pcoll_id not in self.execution_context.pcoll_buffers:
+  self.execution_context.pcoll_buffers[pcoll_id] = ListBuffer(
+  coder_impl=coder.get_impl())
+data_input[transform.unique_name] = \
+  self.execution_context.pcoll_buffers[pcoll_id]
+elif transform.spec.urn == bundle_processor.DATA_OUTPUT_URN:
+  data_output[transform.unique_name] = pcoll_id
+  coder_id = self.execution_context.data_channel_coders[only_element(
+  transform.inputs.values())]
+else:
+  raise NotImplementedError
+data_spec = beam_fn_api_pb2.RemoteGrpcPort(coder_id=coder_id)
+data_api_service_descriptor = \
+  self.data_api_service_descriptor()
+if data_api_service_descriptor:
+  data_spec.api_service_descriptor.url = (
+  data_api_service_descriptor.url)
+transform.spec.payload = data_spec.SerializeToString()
+  elif transform.spec.urn in translations.PAR_DO_URNS:
+for timer_family_id in payload.timer_family_specs.keys():
+  expected_timer_output[(transform.unique_name, timer_family_id)] = (
+  create_buffer_id(timer_family_id, 'timers'))
+return data_input, data_output, expected_timer_output
 
 Review comment:
   Update docs to match. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421582)
Time Spent: 1h 50m  (was: 1h 40m)

> Abstract bundle execution logic from stage execution logic
> 

[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=421583=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421583
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 13/Apr/20 19:38
Start Date: 13/Apr/20 19:38
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#discussion_r407667700
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
 ##
 @@ -75,21 +75,27 @@
 
 IMPULSE_BUFFER = b'impulse'
 
+# SideInputId is identified by a consumer ParDo + tag.
+SideInputId = Tuple[str, str]
+
+DataSideInput = Dict[SideInputId,
 
 Review comment:
   What does the value represent? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421583)
Time Spent: 1h 50m  (was: 1h 40m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=421585=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421585
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 13/Apr/20 19:38
Start Date: 13/Apr/20 19:38
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#discussion_r407668033
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
 ##
 @@ -149,15 +155,26 @@ def no_overlap(a, b):
 return (
 not consumer.forced_root and not self in consumer.must_follow and
 not self.is_runner_urn(context) and
-not consumer.is_runner_urn(context) and
-no_overlap(self.downstream_side_inputs, consumer.side_inputs()))
+not consumer.is_runner_urn(context) and no_overlap(
+set(self.downstream_side_inputs.keys()),
+{i
+ for i, _, _ in consumer.side_inputs()}))
+
+  def _fuse_downstream_side_inputs(self, other):
+res = dict(self.downstream_side_inputs)
+for si, other_si_ids in other.downstream_side_inputs.items():
+  if si in res:
+res[si] = union(res[si], other_si_ids)
 
 Review comment:
   So this is actually a dict mapping to sets? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421585)
Time Spent: 2h  (was: 1h 50m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=421586=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421586
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 13/Apr/20 19:38
Start Date: 13/Apr/20 19:38
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#discussion_r407666877
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
 ##
 @@ -149,15 +155,26 @@ def no_overlap(a, b):
 return (
 not consumer.forced_root and not self in consumer.must_follow and
 not self.is_runner_urn(context) and
-not consumer.is_runner_urn(context) and
-no_overlap(self.downstream_side_inputs, consumer.side_inputs()))
+not consumer.is_runner_urn(context) and no_overlap(
+set(self.downstream_side_inputs.keys()),
+{i
+ for i, _, _ in consumer.side_inputs()}))
+
+  def _fuse_downstream_side_inputs(self, other):
+res = dict(self.downstream_side_inputs)
+for si, other_si_ids in other.downstream_side_inputs.items():
+  if si in res:
+res[si] = union(res[si], other_si_ids)
+  else:
+res[si] = other_si_ids
+return res
 
   def fuse(self, other):
 # type: (Stage) -> Stage
 return Stage(
 "(%s)+(%s)" % (self.name, other.name),
 self.transforms + other.transforms,
-union(self.downstream_side_inputs, other.downstream_side_inputs),
+self._fuse_downstream_side_inputs(other),
 
 Review comment:
   Nit: this sounds like it mutates self.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421586)
Time Spent: 2h 10m  (was: 2h)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=421579=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421579
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 13/Apr/20 19:38
Start Date: 13/Apr/20 19:38
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#discussion_r407620241
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -326,8 +327,8 @@ def commit_side_inputs_to_state(
   data_side_input,  # type: DataSideInput
   ):
 # type: (...) -> None
-for (consuming_transform_id, tag), (buffer_id, func_spec) \
-in data_side_input.items():
+for (consuming_transform_id, tag), (buffer_id,
 
 Review comment:
   I wonder if 
   
   `((consuming_transform_id, tag), (buffer_id, func_spec))`
   
   would make both yapf and humans happy.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421579)
Time Spent: 1h 20m  (was: 1h 10m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=421581=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421581
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 13/Apr/20 19:38
Start Date: 13/Apr/20 19:38
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#discussion_r407664777
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -367,6 +413,73 @@ def _build_process_bundle_descriptor(self):
 state_api_service_descriptor=self.state_api_service_descriptor(),
 timer_api_service_descriptor=self.data_api_service_descriptor())
 
+  def commit_output_views_to_state(self):
 
 Review comment:
   Not sure what "output views" means. Maybe call this 
commit_side_inputs_to_state as well? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421581)
Time Spent: 1h 40m  (was: 1.5h)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=421580=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421580
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 13/Apr/20 19:38
Start Date: 13/Apr/20 19:38
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#discussion_r407663602
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -367,6 +413,73 @@ def _build_process_bundle_descriptor(self):
 state_api_service_descriptor=self.state_api_service_descriptor(),
 timer_api_service_descriptor=self.data_api_service_descriptor())
 
+  def commit_output_views_to_state(self):
+"""Commit bundle outputs to state to be consumed as side inputs later.
+
+Only the outputs that should be side inputs are committed to state.
+"""
+data_side_input = {}  # type: DataSideInput
+for pcoll, si_ids in self.stage.downstream_side_inputs.items():
+  for (consumer_transform_name, tag), access_pattern in si_ids.items():
+data_side_input[consumer_transform_name, tag] = (
+translations.create_buffer_id(pcoll), access_pattern)
+self.execution_context.commit_side_inputs_to_state(data_side_input)
+
+  def extract_bundle_inputs(self):
 
 Review comment:
   extract_bundle_inputs_and_outputs?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421580)
Time Spent: 1.5h  (was: 1h 20m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=420825=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-420825
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 11/Apr/20 21:08
Start Date: 11/Apr/20 21:08
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#issuecomment-612513921
 
 
   Run Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 420825)
Time Spent: 1h 10m  (was: 1h)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=420813=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-420813
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 11/Apr/20 19:32
Start Date: 11/Apr/20 19:32
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#issuecomment-612497517
 
 
   Run Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 420813)
Time Spent: 1h  (was: 50m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=420812=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-420812
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 11/Apr/20 19:31
Start Date: 11/Apr/20 19:31
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#issuecomment-612497313
 
 
   Run Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 420812)
Time Spent: 50m  (was: 40m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=420620=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-420620
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 10/Apr/20 23:57
Start Date: 10/Apr/20 23:57
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#issuecomment-612271558
 
 
   attempting to rebase. let's see how far that takes me...
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 420620)
Time Spent: 40m  (was: 0.5h)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=417426=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-417426
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 07/Apr/20 03:17
Start Date: 07/Apr/20 03:17
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#issuecomment-610150292
 
 
   @robertwb ptal
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 417426)
Time Spent: 0.5h  (was: 20m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=414282=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-414282
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 01/Apr/20 20:37
Start Date: 01/Apr/20 20:37
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#issuecomment-607477435
 
 
   Run Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 414282)
Time Spent: 20m  (was: 10m)

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9639) Abstract bundle execution logic from stage execution logic

2020-04-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9639?focusedWorklogId=414225=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-414225
 ]

ASF GitHub Bot logged work on BEAM-9639:


Author: ASF GitHub Bot
Created on: 01/Apr/20 18:50
Start Date: 01/Apr/20 18:50
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #11270: 
[BEAM-9639][BEAM-9608] Improvements for FnApiRunner
URL: https://github.com/apache/beam/pull/11270#issuecomment-607428460
 
 
   this latest commit is purely aesthetic
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 414225)
Remaining Estimate: 0h
Time Spent: 10m

> Abstract bundle execution logic from stage execution logic
> --
>
> Key: BEAM-9639
> URL: https://issues.apache.org/jira/browse/BEAM-9639
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The FnApiRunner currently works on a per-stage manner, and does not abstract 
> single-bundle execution much. This work item is to clearly define the code to 
> execute a single bundle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)