[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421682&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421682
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 13/Apr/20 22:02
Start Date: 13/Apr/20 22:02
Worklog Time Spent: 10m 
  Work Description: ibzib commented on pull request #11407: [BEAM-9562] 
Cherry-pick: Fix output timestamp to be inferred from scheduled time w…
URL: https://github.com/apache/beam/pull/11407
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421682)
Time Spent: 24h 20m  (was: 24h 10m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 24h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421626&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421626
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 13/Apr/20 20:43
Start Date: 13/Apr/20 20:43
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on issue #11407: [BEAM-9562] 
Cherry-pick: Fix output timestamp to be inferred from scheduled time w…
URL: https://github.com/apache/beam/pull/11407#issuecomment-613088556
 
 
   R: @ibzib 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421626)
Time Spent: 24h 10m  (was: 24h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 24h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421625&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421625
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 13/Apr/20 20:42
Start Date: 13/Apr/20 20:42
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #11407: [BEAM-9562] 
Cherry-pick: Fix output timestamp to be inferred from scheduled time w…
URL: https://github.com/apache/beam/pull/11407
 
 
   …hen in the event time domain.
   
   (cherry picked from commit 009578e374523f5acd8d24543ef1ceec30542a95)
   
   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostComm

[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421624&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421624
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 13/Apr/20 20:36
Start Date: 13/Apr/20 20:36
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #11314: [BEAM-9562] Send Timers 
over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#issuecomment-613085672
 
 
   I was actually working on something related to timers in #11362 and was 
surprised to see that the test failed when I opened the PR, since I had run 
tests locally. Then figured something must have changed on master in the 
meantime. Thanks for following up with this!
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421624)
Time Spent: 23h 50m  (was: 23h 40m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 23h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421617&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421617
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 13/Apr/20 20:30
Start Date: 13/Apr/20 20:30
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #11402: [BEAM-9562] Fix output 
timestamp to be inferred from scheduled time when in the event time domain.
URL: https://github.com/apache/beam/pull/11402#issuecomment-613082627
 
 
   Thanks for correcting this!
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421617)
Time Spent: 23h 40m  (was: 23.5h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 23h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421597&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421597
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 13/Apr/20 19:57
Start Date: 13/Apr/20 19:57
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #11402: [BEAM-9562] 
Fix output timestamp to be inferred from scheduled time when in the event time 
domain.
URL: https://github.com/apache/beam/pull/11402
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421597)
Time Spent: 23.5h  (was: 23h 20m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 23.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421537&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421537
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 13/Apr/20 18:44
Start Date: 13/Apr/20 18:44
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on issue #11402: [BEAM-9562] Fix 
output timestamp to be inferred from scheduled time when in the event time 
domain.
URL: https://github.com/apache/beam/pull/11402#issuecomment-613036333
 
 
   Run Java Spark PortableValidatesRunner Batch
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421537)
Time Spent: 23h  (was: 22h 50m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 23h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421538&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421538
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 13/Apr/20 18:44
Start Date: 13/Apr/20 18:44
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #11402: [BEAM-9562] Fix 
output timestamp to be inferred from scheduled time when in the event time 
domain.
URL: https://github.com/apache/beam/pull/11402#issuecomment-613036337
 
 
   Run Java Flink PortableValidatesRunner Streaming
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421538)
Time Spent: 23h 10m  (was: 23h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 23h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421536&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421536
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 13/Apr/20 18:44
Start Date: 13/Apr/20 18:44
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #11402: [BEAM-9562] Fix 
output timestamp to be inferred from scheduled time when in the event time 
domain.
URL: https://github.com/apache/beam/pull/11402#issuecomment-613036251
 
 
   Run Java Flink PortableValidatesRunner Batch
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421536)
Time Spent: 22h 50m  (was: 22h 40m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 22h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421539&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421539
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 13/Apr/20 18:44
Start Date: 13/Apr/20 18:44
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #11402: [BEAM-9562] Fix 
output timestamp to be inferred from scheduled time when in the event time 
domain.
URL: https://github.com/apache/beam/pull/11402#issuecomment-613036374
 
 
   Run Java Spark PortableValidatesRunner Batch
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421539)
Time Spent: 23h 20m  (was: 23h 10m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 23h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421535&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421535
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 13/Apr/20 18:43
Start Date: 13/Apr/20 18:43
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #11402: [BEAM-9562] Fix 
output timestamp to be inferred from scheduled time when in the event time 
domain.
URL: https://github.com/apache/beam/pull/11402#issuecomment-613035835
 
 
   R: @boyuanzz @robertwb 
   CC: @ibzib We'll need this for 2.21.0 release.
   CC: @mxm
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421535)
Time Spent: 22h 40m  (was: 22.5h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 22h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421531&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421531
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 13/Apr/20 18:41
Start Date: 13/Apr/20 18:41
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11402: [BEAM-9562] 
Fix output timestamp to be inferred from scheduled time when in the event time 
domain.
URL: https://github.com/apache/beam/pull/11402
 
 
   I copied the setting/validation logic from the [SimpleDoFnRunner 
Timers](https://github.com/apache/beam/blob/296f5a74f981c9023a54e1c2c89db7ee8e6b428a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L844)
 implementation.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_

[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421532&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421532
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 13/Apr/20 18:41
Start Date: 13/Apr/20 18:41
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #11314: [BEAM-9562] Send 
Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#issuecomment-613034995
 
 
   The problem is with the Timer implementation inside the FnApiDoFnRunner. The 
spec for Timer wasn't clear as to what the defaults were when 
withOutputTimestamp was added and hence some critical logic was deleted during 
the migration.
   
   See #11402 for the fix.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421532)
Time Spent: 22.5h  (was: 22h 20m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 22.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421463&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421463
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 13/Apr/20 17:38
Start Date: 13/Apr/20 17:38
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on issue #11314: [BEAM-9562] Send 
Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#issuecomment-613006148
 
 
   > This is a big change which also affects the runners. Would it have made 
sense to notify Runner authors, especially since post commit tests are broken? 
It took me a bit to figure out what caused the regression.
   
   Thanks, Max! Sorry for the inconvenience. It seems like currently both Spark 
and Flink fail on the same test: 
org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerAlignBounded.
 The failure pattern is also the same: the pipeline only produces the output 
from timer, not from the ProcessElement fn. I think there should be something 
wrong in the java runner shared library code. Have you worked on it? Or do you 
want me to follow up fixing this issue?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421463)
Time Spent: 22h 10m  (was: 22h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 22h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421456&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421456
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 13/Apr/20 17:26
Start Date: 13/Apr/20 17:26
Worklog Time Spent: 10m 
  Work Description: ibzib commented on issue #11314: [BEAM-9562] Send 
Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#issuecomment-613000822
 
 
   @mxm Which post commits are you referring to? & Can you please mark the 
jira(s) with fix version 2.21.0 so we can fix the regression in the release?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421456)
Time Spent: 22h  (was: 21h 50m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 22h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421198&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421198
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 13/Apr/20 09:24
Start Date: 13/Apr/20 09:24
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #11314: [BEAM-9562] Send Timers 
over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#issuecomment-612825390
 
 
   This is a big change which also affects the runners. Would it have made 
sense to notify Runner authors, especially since post commit tests are broken? 
It took me a bit to figure out what caused the regression.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 421198)
Time Spent: 21h 50m  (was: 21h 40m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 21h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=420517&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-420517
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 10/Apr/20 21:26
Start Date: 10/Apr/20 21:26
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #11373: [BEAM-9562] 
Update Element.timer and Element.Timer to Element.timers and Element.Timers
URL: https://github.com/apache/beam/pull/11373
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 420517)
Time Spent: 21h 40m  (was: 21.5h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 21h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=420462&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-420462
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 10/Apr/20 20:07
Start Date: 10/Apr/20 20:07
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #11373: [BEAM-9562] 
Update Element.timer and Element.Timer to Element.timers and Element.Timers
URL: https://github.com/apache/beam/pull/11373#discussion_r406921308
 
 

 ##
 File path: sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go
 ##
 @@ -6242,11 +6289,11 @@ var fileDescriptor_cf57597c3a9659a9 = []byte{
 
 // Reference imports to suppress errors if they are not otherwise used.
 var _ context.Context
-var _ grpc.ClientConn
+var _ grpc.ClientConnInterface
 
 Review comment:
   It's fixed by following 
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/model/PROTOBUF.md#generated-go-code-fails-to-build
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 420462)
Time Spent: 21.5h  (was: 21h 20m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 21.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=420456&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-420456
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 10/Apr/20 19:59
Start Date: 10/Apr/20 19:59
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #11373: [BEAM-9562] 
Update Element.timer and Element.Timer to Element.timers and Element.Timers
URL: https://github.com/apache/beam/pull/11373#discussion_r406918619
 
 

 ##
 File path: sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go
 ##
 @@ -6242,11 +6289,11 @@ var fileDescriptor_cf57597c3a9659a9 = []byte{
 
 // Reference imports to suppress errors if they are not otherwise used.
 var _ context.Context
-var _ grpc.ClientConn
+var _ grpc.ClientConnInterface
 
 Review comment:
   @lostluck I'm not sure why this line and L6296 were changed. Do you have any 
idea?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 420456)
Time Spent: 21h 20m  (was: 21h 10m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 21h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=420445&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-420445
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 10/Apr/20 19:41
Start Date: 10/Apr/20 19:41
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11373: [BEAM-9562] 
Update Element.timer and Element.Timer to Element.timers and Element.Timers
URL: https://github.com/apache/beam/pull/11373#discussion_r406912246
 
 

 ##
 File path: model/fn-execution/src/main/proto/beam_fn_api.proto
 ##
 @@ -516,7 +516,7 @@ message Elements {
   repeated Data data = 1;
 
   // (Optional)  A list of timer byte streams.
-  repeated Timer timer = 2;
+  repeated Timer timers = 2;
 
 Review comment:
   Both.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 420445)
Time Spent: 21h 10m  (was: 21h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 21h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419957&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419957
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 10/Apr/20 02:01
Start Date: 10/Apr/20 02:01
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11373: [BEAM-9562] 
Update Element.timer to Element.timers
URL: https://github.com/apache/beam/pull/11373#discussion_r406567717
 
 

 ##
 File path: model/fn-execution/src/main/proto/beam_fn_api.proto
 ##
 @@ -516,7 +516,7 @@ message Elements {
   repeated Data data = 1;
 
   // (Optional)  A list of timer byte streams.
-  repeated Timer timer = 2;
+  repeated Timer timers = 2;
 
 Review comment:
   @robertwb did you want to rename this field or the proto message Timer -> 
Timers or both?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419957)
Time Spent: 21h  (was: 20h 50m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 21h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419956&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419956
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 10/Apr/20 02:00
Start Date: 10/Apr/20 02:00
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #11373: [BEAM-9562] Update 
Element.timer to Element.timers
URL: https://github.com/apache/beam/pull/11373#issuecomment-611839103
 
 
   please regenerate the go protos
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419956)
Time Spent: 20h 50m  (was: 20h 40m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 20h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419923&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419923
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 10/Apr/20 00:51
Start Date: 10/Apr/20 00:51
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #11373: [BEAM-9562] 
Update Element.timer to Element.timers
URL: https://github.com/apache/beam/pull/11373
 
 
   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner

[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419907&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419907
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 10/Apr/20 00:19
Start Date: 10/Apr/20 00:19
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419907)
Time Spent: 20.5h  (was: 20h 20m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 20.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419796&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419796
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 21:36
Start Date: 09/Apr/20 21:36
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406491316
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -536,7 +525,8 @@ def _run_stage(self,
 runner_execution_context,
 bundle_context_manager,
 data_input,
-data_output,
+data_output, {},
 
 Review comment:
   yapf helps me put the {} here.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419796)
Time Spent: 20h 20m  (was: 20h 10m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 20h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419793&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419793
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 21:34
Start Date: 09/Apr/20 21:34
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406490454
 
 

 ##
 File path: sdks/python/apache_beam/transforms/core.py
 ##
 @@ -1272,6 +1272,8 @@ def expand(self, pcoll):
 key_coder = coder.key_coder()
   else:
 key_coder = coders.registry.get_coder(typehints.Any)
+  self.window_coder = pcoll.windowing.windowfn.get_window_coder()
 
 Review comment:
   No. Will removed.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419793)
Time Spent: 19h 50m  (was: 19h 40m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 19h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419795&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419795
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 21:34
Start Date: 09/Apr/20 21:34
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406490556
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -987,8 +1019,10 @@ def __init__(
 
   def process_bundle(self,
  inputs,  # type: Mapping[str, PartitionableBuffer]
- expected_outputs  # type: DataOutput
-):
+ expected_outputs,  # type: DataOutput
+ fired_timers,  # type: Mapping[str, Mapping[str, 
PartitionableBuffer]]
 
 Review comment:
   I updated the `fired_timers` implementation but forgot to update the typing 
here. Thanks!
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419795)
Time Spent: 20h 10m  (was: 20h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 20h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419794&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419794
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 21:34
Start Date: 09/Apr/20 21:34
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406490504
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##
 @@ -837,25 +869,59 @@ def process_bundle(self, instruction_id):
 op.execution_context = execution_context
 op.start()
 
-  # Inject inputs from data plane.
+  # Each data_channel is mapped to a list of expected inputs which includes
+  # both data input and timer input. The data input is identied by
+  # transform_id. The data input is identified by
+  # (transform_id, timer_family_id).
   data_channels = collections.defaultdict(
   list
   )  # type: DefaultDict[data_plane.GrpcClientDataChannel, List[str]]
+
+  # Inject data inputs from data plane.
 
 Review comment:
   Updated the comment.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419794)
Time Spent: 20h  (was: 19h 50m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 20h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419780&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419780
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 21:15
Start Date: 09/Apr/20 21:15
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406481820
 
 

 ##
 File path: sdks/python/apache_beam/transforms/core.py
 ##
 @@ -1323,12 +1325,47 @@ def _pardo_fn_data(self):
 windowing = None
 return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
 
-  def to_runner_api_parameter(self, context):
+  def _get_key_and_window_coder(self, named_inputs):
+if named_inputs is None or not self._signature.is_stateful_dofn():
+  return None, None
+main_input = list(set(named_inputs.keys()) - set(self.side_inputs))[0]
+input_pcoll = named_inputs[main_input]
+kv_type_hint = input_pcoll.element_type
+if kv_type_hint and kv_type_hint != typehints.Any:
+  coder = coders.registry.get_coder(kv_type_hint)
+  if not coder.is_kv_coder():
+raise ValueError(
+'Input elements to the transform %s with stateful DoFn must be '
+'key-value pairs.' % self)
+  key_coder = coder.key_coder()
+else:
+  key_coder = coders.registry.get_coder(typehints.Any)
+window_coder = input_pcoll.windowing.windowfn.get_window_coder()
+return key_coder, window_coder
+
+  def to_runner_api(self, context, **extra_kwargs):
 
 Review comment:
   We can delete this override since we pass `extra_kwargs` from `PTransform` 
now. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419780)
Time Spent: 19h 40m  (was: 19.5h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 19h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419762&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419762
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 21:01
Start Date: 09/Apr/20 21:01
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406468331
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -536,7 +525,8 @@ def _run_stage(self,
 runner_execution_context,
 bundle_context_manager,
 data_input,
-data_output,
+data_output, {},
 
 Review comment:
   Put {} on its own line. (Surprised yapf didn't complain, or maybe you 
haven't run it yet.)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419762)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 19.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419758&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419758
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 21:01
Start Date: 09/Apr/20 21:01
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406444781
 
 

 ##
 File path: sdks/python/apache_beam/transforms/core.py
 ##
 @@ -1323,12 +1325,47 @@ def _pardo_fn_data(self):
 windowing = None
 return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
 
-  def to_runner_api_parameter(self, context):
+  def _get_key_and_window_coder(self, named_inputs):
+if named_inputs is None or not self._signature.is_stateful_dofn():
+  return None, None
+main_input = list(set(named_inputs.keys()) - set(self.side_inputs))[0]
+input_pcoll = named_inputs[main_input]
+kv_type_hint = input_pcoll.element_type
+if kv_type_hint and kv_type_hint != typehints.Any:
+  coder = coders.registry.get_coder(kv_type_hint)
+  if not coder.is_kv_coder():
+raise ValueError(
+'Input elements to the transform %s with stateful DoFn must be '
+'key-value pairs.' % self)
+  key_coder = coder.key_coder()
+else:
+  key_coder = coders.registry.get_coder(typehints.Any)
+window_coder = input_pcoll.windowing.windowfn.get_window_coder()
+return key_coder, window_coder
+
+  def to_runner_api(self, context, **extra_kwargs):
 
 Review comment:
   This code looks like it's copied from the superclass, instead just do
   
   ```
   def to_runner_api(self, context, named_inputs, **extra_kwargs):
 super(ParDo, self).to_runner_api, named_inputs=named_inputs, 
**extra_kwargs)
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419758)
Time Spent: 19h 20m  (was: 19h 10m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 19h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419759&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419759
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 21:01
Start Date: 09/Apr/20 21:01
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406473230
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -914,6 +926,17 @@ def process_bundle(self,
 
 split_manager = self._select_split_manager()
 if not split_manager:
+  # Send the fired timers if any.
+  for (transform_id, timer_family_id), timers in fired_timers.items():
+self._send_timers_to_worker(
+process_bundle_id, transform_id, timer_family_id, timers)
+
+  for transform_id, timer_family_id in (
+  set(expected_output_timers.keys()) - set(fired_timers.keys())):
+# Close the stream if there is no timers to be sent.
 
 Review comment:
   This is a subtle point. I might write something like "The worker waits for a 
logical timer stream to be closed for every possible timer, regardless of 
whether there are any timers to be sent."
   
   Maybe it'd be clearer to iterate over `expected_output_timers`, and send 
`fired_timers.get((transform_id, timer_family_id), [])`. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419759)
Time Spent: 19.5h  (was: 19h 20m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 19.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419760&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419760
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 21:01
Start Date: 09/Apr/20 21:01
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406474463
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -355,20 +364,41 @@ def _build_process_bundle_descriptor(self):
 items()),
 environments=dict(
 self.execution_context.pipeline_components.environments.items()),
-state_api_service_descriptor=self.state_api_service_descriptor())
+state_api_service_descriptor=self.state_api_service_descriptor(),
+timer_api_service_descriptor=self.data_api_service_descriptor())
 
   def get_input_coder_impl(self, transform_id):
 # type: (str) -> CoderImpl
 coder_id = beam_fn_api_pb2.RemoteGrpcPort.FromString(
 self.process_bundle_descriptor.transforms[transform_id].spec.payload
 ).coder_id
 assert coder_id
+return self.get_coder_impl(coder_id)
+
+  def _build_timer_coders_id_map(self):
+timer_coder_ids = {}
+for transform_id, transform_proto in (self._process_bundle_descriptor
+.transforms.items()):
+  if transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn:
+pardo_payload = proto_utils.parse_Bytes(
+transform_proto.spec.payload, beam_runner_api_pb2.ParDoPayload)
+for id, timer_family_spec in pardo_payload.timer_family_specs.items():
+  timer_coder_ids[(transform_id, id)] = (
+  timer_family_spec.timer_family_coder_id)
+return timer_coder_ids
+
+  def get_coder_impl(self, coder_id):
 if coder_id in self.execution_context.safe_coders:
   return self.execution_context.pipeline_context.coders[
   self.execution_context.safe_coders[coder_id]].get_impl()
 else:
   return 
self.execution_context.pipeline_context.coders[coder_id].get_impl()
 
+  def get_timer_coder_impl(self, transform_id, timer_family_id):
+assert (transform_id, timer_family_id) in self._timer_coder_ids
 
 Review comment:
   The key error if it's not present below will be sufficient. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419760)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 19.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419755&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419755
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 21:01
Start Date: 09/Apr/20 21:01
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406465514
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##
 @@ -837,25 +869,59 @@ def process_bundle(self, instruction_id):
 op.execution_context = execution_context
 op.start()
 
-  # Inject inputs from data plane.
+  # Each data_channel is mapped to a list of expected inputs which includes
+  # both data input and timer input. The data input is identied by
+  # transform_id. The data input is identified by
+  # (transform_id, timer_family_id).
   data_channels = collections.defaultdict(
   list
   )  # type: DefaultDict[data_plane.GrpcClientDataChannel, List[str]]
+
+  # Inject data inputs from data plane.
 
 Review comment:
   This comment is a bit misleading, as the injection doesn't happen in this 
for loop. (Similarly with timers.)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419755)
Time Spent: 19h 10m  (was: 19h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 19h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419757&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419757
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 21:01
Start Date: 09/Apr/20 21:01
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406467481
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -987,8 +1019,10 @@ def __init__(
 
   def process_bundle(self,
  inputs,  # type: Mapping[str, PartitionableBuffer]
- expected_outputs  # type: DataOutput
-):
+ expected_outputs,  # type: DataOutput
+ fired_timers,  # type: Mapping[str, Mapping[str, 
PartitionableBuffer]]
 
 Review comment:
   For consistency, should this be a `Mapping[Tuple[str, str], 
PartitionableBuffer]`?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419757)
Time Spent: 19h 20m  (was: 19h 10m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 19h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419756&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419756
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 21:01
Start Date: 09/Apr/20 21:01
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406466215
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
 ##
 @@ -1321,80 +1321,18 @@ def remove_data_plane_ops(stages, pipeline_context):
   yield stage
 
 
-def inject_timer_pcollections(stages, pipeline_context):
+def setup_timer_mapping(stages, pipeline_context):
   # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
 
-  """Create PCollections for fired timers and to-be-set timers.
-
-  At execution time, fired timers and timers-to-set are represented as
-  PCollections that are managed by the runner.  This phase adds the
-  necissary collections, with their read and writes, to any stages using
-  timers.
+  """Set up a mapping of {transform_id: [timer_ids]} for each stage.
   """
   for stage in stages:
-for transform in list(stage.transforms):
+for transform in stage.transforms:
   if transform.spec.urn in PAR_DO_URNS:
 payload = proto_utils.parse_Bytes(
 transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
-for tag, spec in payload.timer_family_specs.items():
-  if len(transform.inputs) > 1:
-raise NotImplementedError('Timers and side inputs.')
-  input_pcoll = pipeline_context.components.pcollections[next(
-  iter(transform.inputs.values()))]
-  # Create the appropriate coder for the timer PCollection.
-  key_coder_id = input_pcoll.coder_id
-  if (pipeline_context.components.coders[key_coder_id].spec.urn ==
-  common_urns.coders.KV.urn):
-key_coder_id = pipeline_context.components.coders[
-key_coder_id].component_coder_ids[0]
-  key_timer_coder_id = pipeline_context.add_or_get_coder_id(
-  beam_runner_api_pb2.Coder(
-  spec=beam_runner_api_pb2.FunctionSpec(
-  urn=common_urns.coders.KV.urn),
-  component_coder_ids=[
-  key_coder_id, spec.timer_family_coder_id
-  ]))
-  # Inject the read and write pcollections.
-  timer_read_pcoll = unique_name(
-  pipeline_context.components.pcollections,
-  '%s_timers_to_read_%s' % (transform.unique_name, tag))
-  timer_write_pcoll = unique_name(
-  pipeline_context.components.pcollections,
-  '%s_timers_to_write_%s' % (transform.unique_name, tag))
-  pipeline_context.components.pcollections[timer_read_pcoll].CopyFrom(
-  beam_runner_api_pb2.PCollection(
-  unique_name=timer_read_pcoll,
-  coder_id=key_timer_coder_id,
-  windowing_strategy_id=input_pcoll.windowing_strategy_id,
-  is_bounded=input_pcoll.is_bounded))
-  pipeline_context.components.pcollections[timer_write_pcoll].CopyFrom(
-  beam_runner_api_pb2.PCollection(
-  unique_name=timer_write_pcoll,
-  coder_id=key_timer_coder_id,
-  windowing_strategy_id=input_pcoll.windowing_strategy_id,
-  is_bounded=input_pcoll.is_bounded))
-  stage.transforms.append(
-  beam_runner_api_pb2.PTransform(
-  unique_name=timer_read_pcoll + '/Read',
-  outputs={'out': timer_read_pcoll},
-  spec=beam_runner_api_pb2.FunctionSpec(
-  urn=bundle_processor.DATA_INPUT_URN,
-  payload=create_buffer_id(timer_read_pcoll,
-   kind='timers'
-  stage.transforms.append(
-  beam_runner_api_pb2.PTransform(
-  unique_name=timer_write_pcoll + '/Write',
-  inputs={'in': timer_write_pcoll},
-  spec=beam_runner_api_pb2.FunctionSpec(
-  urn=bundle_processor.DATA_OUTPUT_URN,
-  payload=create_buffer_id(
-  timer_write_pcoll, kind='timers'
-  assert tag not in transform.inputs
-  transform.inputs[tag] = timer_read_pcoll
-  assert tag not in transform.outputs
-  transform.outputs[tag] = timer_write_pcoll
-  stage.timer_pcollections.append(
-  (timer_read_pcoll + '/Read', timer_write_pcoll))
+for timer_family_id in payload.timer_family_specs.keys():
+  stage.

[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419761&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419761
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 21:01
Start Date: 09/Apr/20 21:01
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406471091
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -896,7 +906,9 @@ def _generate_splits_for_testing(self,
 
   def process_bundle(self,
  inputs,  # type: Mapping[str, PartitionableBuffer]
- expected_outputs  # type: DataOutput
+ expected_outputs,  # type: DataOutput
+ fired_timers,  # type: Mapping[str, Mapping[str, 
PartitionableBuffer]]
 
 Review comment:
   Mapping[Tuple[str, str], PartitionableBuffer]?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419761)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 19.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419754&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419754
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 21:01
Start Date: 09/Apr/20 21:01
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406444656
 
 

 ##
 File path: sdks/python/apache_beam/transforms/core.py
 ##
 @@ -1272,6 +1272,8 @@ def expand(self, pcoll):
 key_coder = coder.key_coder()
   else:
 key_coder = coders.registry.get_coder(typehints.Any)
+  self.window_coder = pcoll.windowing.windowfn.get_window_coder()
 
 Review comment:
   Are these still used?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419754)
Time Spent: 19h  (was: 18h 50m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 19h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419695&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419695
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 19:56
Start Date: 09/Apr/20 19:56
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406442486
 
 

 ##
 File path: sdks/python/apache_beam/transforms/core.py
 ##
 @@ -1323,12 +1325,47 @@ def _pardo_fn_data(self):
 windowing = None
 return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
 
-  def to_runner_api_parameter(self, context):
+  def _get_key_and_window_coder(self, named_inputs):
+if named_inputs is None or not self._signature.is_stateful_dofn():
+  return None, None
+main_input = list(set(named_inputs.keys()) - set(self.side_inputs))[0]
+input_pcoll = named_inputs[main_input]
+kv_type_hint = input_pcoll.element_type
+if kv_type_hint and kv_type_hint != typehints.Any:
+  coder = coders.registry.get_coder(kv_type_hint)
+  if not coder.is_kv_coder():
+raise ValueError(
+'Input elements to the transform %s with stateful DoFn must be '
+'key-value pairs.' % self)
+  key_coder = coder.key_coder()
+else:
+  key_coder = coders.registry.get_coder(typehints.Any)
+window_coder = input_pcoll.windowing.windowfn.get_window_coder()
+return key_coder, window_coder
+
+  def to_runner_api(self, context, **extra_kwargs):
+# type: (PipelineContext, bool) -> beam_runner_api_pb2.FunctionSpec
+has_parts = extra_kwargs.get('has_part', False)
+urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs)
 
 Review comment:
   Nevermind, I see what's going on here. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419695)
Time Spent: 18h 50m  (was: 18h 40m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 18h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419622&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419622
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 18:26
Start Date: 09/Apr/20 18:26
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406394861
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##
 @@ -1088,6 +1145,21 @@ def create_operation(self,
 transform_proto.spec.payload, parameter_type)
 return creator(self, transform_id, transform_proto, payload, consumers)
 
+  def extract_timers_info(self):
 
 Review comment:
   The keys of maps should still be the same though(tuple of (transform_id, 
timer_family_id)). That's why I make the value as a map{coder_impl, 
output_stream}
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419622)
Time Spent: 18h 40m  (was: 18.5h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 18h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419611&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419611
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 18:14
Start Date: 09/Apr/20 18:14
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406387842
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##
 @@ -1088,6 +1145,21 @@ def create_operation(self,
 transform_proto.spec.payload, parameter_type)
 return creator(self, transform_id, transform_proto, payload, consumers)
 
+  def extract_timers_info(self):
 
 Review comment:
   Ack, expanding the whole diff I see that this is happening in different 
methods now (in which case here two separate maps, as you had originally, might 
be preferable). But not a big deal. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419611)
Time Spent: 18.5h  (was: 18h 20m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 18.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419607&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419607
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 18:05
Start Date: 09/Apr/20 18:05
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406382954
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##
 @@ -1088,6 +1145,21 @@ def create_operation(self,
 transform_proto.spec.payload, parameter_type)
 return creator(self, transform_id, transform_proto, payload, consumers)
 
+  def extract_timers_info(self):
 
 Review comment:
   We can only populate output_stream when processing bundle since 
instruction_id is required.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419607)
Time Spent: 18h 20m  (was: 18h 10m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 18h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419547&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419547
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 16:58
Start Date: 09/Apr/20 16:58
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406344891
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##
 @@ -562,45 +562,45 @@ class OutputTimer(object):
   def __init__(self,
key,
window,  # type: windowed_value.BoundedWindow
-   receiver  # type: operations.ConsumerSet
+   paneinfo,
+   timer_family_id,
+   timer_coder_impl,
+   output_stream
   ):
 self._key = key
 self._window = window
-self._receiver = receiver
+self._paneinfo = paneinfo
+self._timer_family_id = timer_family_id
+self._output_stream = output_stream
+self._timer_coder_impl = timer_coder_impl
 
   def set(self, ts):
 ts = timestamp.Timestamp.of(ts)
-# TODO(BEAM-9562): Plumb through actual timer fields.
-self._receiver.receive(
-windowed_value.WindowedValue((
-self._key,
-userstate.Timer(
-user_key='',
-dynamic_timer_tag='',
-windows=(self._window, ),
-clear_bit=False,
-fire_timestamp=ts,
-hold_timestamp=ts,
-paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
- ts, (self._window, )))
+timer = userstate.Timer(
+user_key=self._key,
+dynamic_timer_tag='',
+windows=(self._window, ),
+clear_bit=False,
+fire_timestamp=ts,
+hold_timestamp=ts,
+paneinfo=self._paneinfo)
+self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True)
+self._output_stream.maybe_flush()
 
   def clear(self):
 # type: () -> None
 dummy_millis = int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) + 1
 clear_ts = timestamp.Timestamp(micros=dummy_millis * 1000)
-# TODO(BEAM-9562): Plumb through actual paneinfo.
-self._receiver.receive(
-windowed_value.WindowedValue((
-self._key,
-userstate.Timer(
-user_key='',
-dynamic_timer_tag='',
-windows=(self._window, ),
-clear_bit=False,
-fire_timestamp=timestamp.Timestamp.of(clear_ts),
-hold_timestamp=timestamp.Timestamp.of(0),
-paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
- 0, (self._window, )))
+timer = userstate.Timer(
+user_key=self._key,
+dynamic_timer_tag='',
+windows=(self._window, ),
+clear_bit=False,
+fire_timestamp=clear_ts,
 
 Review comment:
   Correct, when `clear_bit` is `True`, the coder ignores these fields. I think 
we should have a better `Timer` with API `of` and `clear` like in Java as a 
follow up.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419547)
Time Spent: 18h 10m  (was: 18h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 18h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419496&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419496
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 15:59
Start Date: 09/Apr/20 15:59
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #11314: [BEAM-9562] Send 
Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#issuecomment-611607510
 
 
   Run Python2_PVR_Flink PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419496)
Time Spent: 18h  (was: 17h 50m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 18h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419439&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419439
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 14:55
Start Date: 09/Apr/20 14:55
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #11314: [BEAM-9562] Send 
Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#issuecomment-611572961
 
 
   Run Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419439)
Time Spent: 17h 50m  (was: 17h 40m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 17h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419191&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419191
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 07:21
Start Date: 09/Apr/20 07:21
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405989307
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##
 @@ -562,45 +562,45 @@ class OutputTimer(object):
   def __init__(self,
key,
window,  # type: windowed_value.BoundedWindow
-   receiver  # type: operations.ConsumerSet
+   paneinfo,
+   timer_family_id,
+   timer_coder_impl,
+   output_stream
   ):
 self._key = key
 self._window = window
-self._receiver = receiver
+self._paneinfo = paneinfo
+self._timer_family_id = timer_family_id
+self._output_stream = output_stream
+self._timer_coder_impl = timer_coder_impl
 
   def set(self, ts):
 ts = timestamp.Timestamp.of(ts)
-# TODO(BEAM-9562): Plumb through actual timer fields.
-self._receiver.receive(
-windowed_value.WindowedValue((
-self._key,
-userstate.Timer(
-user_key='',
-dynamic_timer_tag='',
-windows=(self._window, ),
-clear_bit=False,
-fire_timestamp=ts,
-hold_timestamp=ts,
-paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
- ts, (self._window, )))
+timer = userstate.Timer(
+user_key=self._key,
+dynamic_timer_tag='',
+windows=(self._window, ),
+clear_bit=False,
+fire_timestamp=ts,
+hold_timestamp=ts,
+paneinfo=self._paneinfo)
+self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True)
+self._output_stream.maybe_flush()
 
   def clear(self):
 # type: () -> None
 dummy_millis = int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) + 1
 clear_ts = timestamp.Timestamp(micros=dummy_millis * 1000)
-# TODO(BEAM-9562): Plumb through actual paneinfo.
-self._receiver.receive(
-windowed_value.WindowedValue((
-self._key,
-userstate.Timer(
-user_key='',
-dynamic_timer_tag='',
-windows=(self._window, ),
-clear_bit=False,
-fire_timestamp=timestamp.Timestamp.of(clear_ts),
-hold_timestamp=timestamp.Timestamp.of(0),
-paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
- 0, (self._window, )))
+timer = userstate.Timer(
+user_key=self._key,
+dynamic_timer_tag='',
+windows=(self._window, ),
+clear_bit=False,
+fire_timestamp=clear_ts,
 
 Review comment:
   They're meaningless when we're clearing a timer (e.g. it won't fire, hold 
back the watermark, or have a pane info).
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419191)
Time Spent: 17h  (was: 16h 50m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 17h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419201&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419201
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 07:21
Start Date: 09/Apr/20 07:21
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406003424
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/data_plane.py
 ##
 @@ -408,27 +470,67 @@ def close_callback(data):
 return ClosableOutputStream.create(
 close_callback, add_to_send_queue, self._data_buffer_time_limit_ms)
 
+  def output_timer_stream(self, instruction_id, transform_id, timer_family_id):
+def add_to_send_queue(timer):
+  if timer:
+self._to_send.put(
+beam_fn_api_pb2.Elements.Timer(
+instruction_id=instruction_id,
+transform_id=transform_id,
+timer_family_id=timer_family_id,
+timers=timer,
+is_last=False))
+
+def close_callback(timer):
+  add_to_send_queue(timer)
+  self._to_send.put(
+  beam_fn_api_pb2.Elements.Timer(
+  instruction_id=instruction_id,
+  transform_id=transform_id,
+  timer_family_id=timer_family_id,
+  timers=b'',
+  is_last=True))
+
+return ClosableOutputStream.create(
+close_callback, add_to_send_queue, self._data_buffer_time_limit_ms)
+
   def _write_outputs(self):
 # type: () -> Iterator[beam_fn_api_pb2.Elements]
-done = False
-while not done:
-  data = [self._to_send.get()]
-  try:
-# Coalesce up to 100 other items.
-for _ in range(100):
-  data.append(self._to_send.get_nowait())
-  except queue.Empty:
-pass
-  if data[-1] is self._WRITES_FINISHED:
-done = True
-data.pop()
-  if data:
-yield beam_fn_api_pb2.Elements(data=data)
+stream_done = False
+while not stream_done:
+  streams = None
+  if not stream_done:
+streams = [self._to_send.get()]
+try:
+  # Coalesce up to 100 other items.
+  for _ in range(100):
+streams.append(self._to_send.get_nowait())
+except queue.Empty:
+  pass
+if streams and streams[-1] is self._WRITES_FINISHED:
+  stream_done = True
+  streams.pop()
+  if streams:
+elements = beam_fn_api_pb2.Elements()
+data_stream = []
+timer_stream = []
+for stream in streams:
+  if isinstance(stream, beam_fn_api_pb2.Elements.Timer):
+timer_stream.append(stream)
+  if isinstance(stream, beam_fn_api_pb2.Elements.Data):
+data_stream.append(stream)
+if data_stream:
 
 Review comment:
   No need to have these conditionals, you can just write
   
   `yield beam_fn_api_pb2.Elements(data=data_stream, timer=timer_stream)`
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419201)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 17.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419203&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419203
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 07:21
Start Date: 09/Apr/20 07:21
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405999089
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/data_plane.py
 ##
 @@ -354,15 +410,15 @@ def input_elements(self,
 
 Args:
   instruction_id(str): instruction_id for which data is read
-  expected_transforms(collection): expected transforms
+  expected_inputs(collection): expected inputs, include both data and 
timer.
 """
 received = self._receiving_queue(instruction_id)
-done_transforms = set()  # type: Set[str]
+done_inputs = set()  # type: Set[str]
 
 Review comment:
   update type hint
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419203)
Time Spent: 17h 40m  (was: 17.5h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 17h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419198&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419198
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 07:21
Start Date: 09/Apr/20 07:21
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405998009
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/data_plane.py
 ##
 @@ -274,20 +301,47 @@ def inverse(self):
 return self._inverse
 
   def input_elements(self,
- instruction_id,  # type: str
- unused_expected_transforms=None,  # type: 
Optional[Collection[str]]
- abort_callback=None  # type: Optional[Callable[[], bool]]
-):
-# type: (...) -> Iterator[beam_fn_api_pb2.Elements.Data]
 
 Review comment:
   It'd be good to not lose the typing information. You can make an alias at 
the top of the file `DataOrTimers = Union[beam_fn_api_pb2.Elements.Data, 
beam_fn_api_pb2.Elements.Timer]` to cut down on verbosity, here and elsewhere. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419198)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 17h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419189&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419189
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 07:21
Start Date: 09/Apr/20 07:21
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405985691
 
 

 ##
 File path: sdks/python/apache_beam/transforms/core.py
 ##
 @@ -1323,12 +1325,47 @@ def _pardo_fn_data(self):
 windowing = None
 return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
 
-  def to_runner_api_parameter(self, context):
+  def _get_key_and_window_coder(self, named_inputs):
+if named_inputs is None or not self._signature.is_stateful_dofn():
+  return None, None
+main_input = list(set(named_inputs.keys()) - set(self.side_inputs))[0]
+input_pcoll = named_inputs[main_input]
+kv_type_hint = input_pcoll.element_type
+if kv_type_hint and kv_type_hint != typehints.Any:
+  coder = coders.registry.get_coder(kv_type_hint)
+  if not coder.is_kv_coder():
+raise ValueError(
+'Input elements to the transform %s with stateful DoFn must be '
+'key-value pairs.' % self)
+  key_coder = coder.key_coder()
+else:
+  key_coder = coders.registry.get_coder(typehints.Any)
+window_coder = input_pcoll.windowing.windowfn.get_window_coder()
+return key_coder, window_coder
+
+  def to_runner_api(self, context, **extra_kwargs):
+# type: (PipelineContext, bool) -> beam_runner_api_pb2.FunctionSpec
+has_parts = extra_kwargs.get('has_part', False)
 
 Review comment:
   You can leave this in the parameter list.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419189)
Time Spent: 16h 50m  (was: 16h 40m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 16h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419197&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419197
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 07:21
Start Date: 09/Apr/20 07:21
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405999346
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/data_plane.py
 ##
 @@ -372,12 +428,18 @@ def input_elements(self,
 t, v, tb = self._exc_info
 raise_(t, v, tb)
 else:
-  # TODO(BEAM-9558): Cleanup once dataflow is updated.
-  if not data.data or data.is_last:
-done_transforms.add(data.transform_id)
-  else:
-assert data.transform_id not in done_transforms
-yield data
+  if isinstance(element, beam_fn_api_pb2.Elements.Timer):
+if element.is_last:
+  done_inputs.add((element.transform_id, element.timer_family_id))
+else:
+  yield element
+  if isinstance(element, beam_fn_api_pb2.Elements.Data):
 
 Review comment:
   elif
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419197)
Time Spent: 17h 20m  (was: 17h 10m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 17h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419200&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419200
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 07:21
Start Date: 09/Apr/20 07:21
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406003474
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/data_plane.py
 ##
 @@ -408,27 +470,67 @@ def close_callback(data):
 return ClosableOutputStream.create(
 close_callback, add_to_send_queue, self._data_buffer_time_limit_ms)
 
+  def output_timer_stream(self, instruction_id, transform_id, timer_family_id):
+def add_to_send_queue(timer):
+  if timer:
+self._to_send.put(
+beam_fn_api_pb2.Elements.Timer(
+instruction_id=instruction_id,
+transform_id=transform_id,
+timer_family_id=timer_family_id,
+timers=timer,
+is_last=False))
+
+def close_callback(timer):
+  add_to_send_queue(timer)
+  self._to_send.put(
+  beam_fn_api_pb2.Elements.Timer(
+  instruction_id=instruction_id,
+  transform_id=transform_id,
+  timer_family_id=timer_family_id,
+  timers=b'',
+  is_last=True))
+
+return ClosableOutputStream.create(
+close_callback, add_to_send_queue, self._data_buffer_time_limit_ms)
+
   def _write_outputs(self):
 # type: () -> Iterator[beam_fn_api_pb2.Elements]
-done = False
-while not done:
-  data = [self._to_send.get()]
-  try:
-# Coalesce up to 100 other items.
-for _ in range(100):
-  data.append(self._to_send.get_nowait())
-  except queue.Empty:
-pass
-  if data[-1] is self._WRITES_FINISHED:
-done = True
-data.pop()
-  if data:
-yield beam_fn_api_pb2.Elements(data=data)
+stream_done = False
+while not stream_done:
+  streams = None
+  if not stream_done:
+streams = [self._to_send.get()]
+try:
+  # Coalesce up to 100 other items.
+  for _ in range(100):
+streams.append(self._to_send.get_nowait())
+except queue.Empty:
+  pass
+if streams and streams[-1] is self._WRITES_FINISHED:
+  stream_done = True
+  streams.pop()
+  if streams:
+elements = beam_fn_api_pb2.Elements()
+data_stream = []
+timer_stream = []
+for stream in streams:
+  if isinstance(stream, beam_fn_api_pb2.Elements.Timer):
+timer_stream.append(stream)
+  if isinstance(stream, beam_fn_api_pb2.Elements.Data):
 
 Review comment:
   else
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419200)
Time Spent: 17.5h  (was: 17h 20m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 17.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419196&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419196
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 07:21
Start Date: 09/Apr/20 07:21
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405998237
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/data_plane.py
 ##
 @@ -274,20 +301,47 @@ def inverse(self):
 return self._inverse
 
   def input_elements(self,
- instruction_id,  # type: str
- unused_expected_transforms=None,  # type: 
Optional[Collection[str]]
- abort_callback=None  # type: Optional[Callable[[], bool]]
-):
-# type: (...) -> Iterator[beam_fn_api_pb2.Elements.Data]
+  instruction_id,  # type: str
+  unused_expected_inputes=None,   # type: Collection[str]
 
 Review comment:
   inputes -> inputs
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419196)
Time Spent: 17h 20m  (was: 17h 10m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 17h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419194&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419194
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 07:21
Start Date: 09/Apr/20 07:21
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405994886
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##
 @@ -745,6 +746,24 @@ def __init__(self,
 self.process_bundle_descriptor = process_bundle_descriptor
 self.state_handler = state_handler
 self.data_channel_factory = data_channel_factory
+
+# There is no guarantee that the runner only set
+# timer_api_service_descriptor when having timers. So this field cannot be
+# used as an indicator of timers.
+if self.process_bundle_descriptor.timer_api_service_descriptor:
+  self.timer_data_channel = (
+  data_channel_factory.create_data_channel_from_url(
+  self.process_bundle_descriptor.timer_api_service_descriptor.url))
+else:
+  self.timer_data_channel = None
+
+# A mapping of
+# {(transform_id, timer_family_id):
+# {"timer_coder_impl": coder, "output_stream": stream}}
 
 Review comment:
   Optonal: [Named] tuples are usually easier to work with than dicts.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419194)
Time Spent: 17h 10m  (was: 17h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 17h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419193&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419193
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 07:21
Start Date: 09/Apr/20 07:21
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405990412
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##
 @@ -745,6 +746,24 @@ def __init__(self,
 self.process_bundle_descriptor = process_bundle_descriptor
 self.state_handler = state_handler
 self.data_channel_factory = data_channel_factory
+
+# There is no guarantee that the runner only set
+# timer_api_service_descriptor when having timers. So this field cannot be
+# used as an indicator of timers.
+if self.process_bundle_descriptor.timer_api_service_descriptor:
+  self.timer_data_channel = (
+  data_channel_factory.create_data_channel_from_url(
+  self.process_bundle_descriptor.timer_api_service_descriptor.url))
+else:
+  self.timer_data_channel = None
+
+# A mapping of
+# {(transform_id, timer_family_id):
+# {"timer_coder_impl": coder, "output_stream": stream}}
+# The mapping keeps empty when there is no timer_family_specs in the
 
 Review comment:
   Nit: The mapping stays (or is) empty...
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419193)
Time Spent: 17h 10m  (was: 17h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 17h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419195&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419195
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 07:21
Start Date: 09/Apr/20 07:21
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405995554
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##
 @@ -1088,6 +1145,21 @@ def create_operation(self,
 transform_proto.spec.payload, parameter_type)
 return creator(self, transform_id, transform_proto, payload, consumers)
 
+  def extract_timers_info(self):
 
 Review comment:
   I would populate output_stream here as well rather than above.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419195)
Time Spent: 17h 20m  (was: 17h 10m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 17h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419188&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419188
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 07:21
Start Date: 09/Apr/20 07:21
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405986529
 
 

 ##
 File path: sdks/python/apache_beam/transforms/core.py
 ##
 @@ -1323,12 +1325,47 @@ def _pardo_fn_data(self):
 windowing = None
 return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
 
-  def to_runner_api_parameter(self, context):
+  def _get_key_and_window_coder(self, named_inputs):
+if named_inputs is None or not self._signature.is_stateful_dofn():
+  return None, None
+main_input = list(set(named_inputs.keys()) - set(self.side_inputs))[0]
+input_pcoll = named_inputs[main_input]
+kv_type_hint = input_pcoll.element_type
+if kv_type_hint and kv_type_hint != typehints.Any:
+  coder = coders.registry.get_coder(kv_type_hint)
+  if not coder.is_kv_coder():
+raise ValueError(
+'Input elements to the transform %s with stateful DoFn must be '
+'key-value pairs.' % self)
+  key_coder = coder.key_coder()
+else:
+  key_coder = coders.registry.get_coder(typehints.Any)
+window_coder = input_pcoll.windowing.windowfn.get_window_coder()
+return key_coder, window_coder
+
+  def to_runner_api(self, context, **extra_kwargs):
+# type: (PipelineContext, bool) -> beam_runner_api_pb2.FunctionSpec
+has_parts = extra_kwargs.get('has_part', False)
+urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs)
+if urn == python_urns.GENERIC_COMPOSITE_TRANSFORM and not has_parts:
+  # TODO(BEAM-3812): Remove this fallback.
+  urn, typed_param = self.to_runner_api_pickled(context)
+return beam_runner_api_pb2.FunctionSpec(
+urn=urn,
+payload=typed_param.SerializeToString() if isinstance(
+typed_param, message.Message) else typed_param.encode('utf-8')
+if isinstance(typed_param, str) else typed_param)
+
+  def to_runner_api_parameter(self, context, **extra_kwargs):
 # type: (PipelineContext) -> typing.Tuple[str, message.Message]
 assert isinstance(self, ParDo), \
 "expected instance of ParDo, but got %s" % self.__class__
+key_coder, window_coder = self._get_key_and_window_coder(
 
 Review comment:
   Maybe put this in the if block below closer to where they're used? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419188)
Time Spent: 16h 40m  (was: 16.5h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 16h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419202&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419202
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 07:21
Start Date: 09/Apr/20 07:21
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406004352
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/operations.pxd
 ##
 @@ -92,7 +92,7 @@ cdef class DoOperation(Operation):
   cdef DoFnRunner dofn_runner
   cdef object tagged_receivers
   cdef object side_input_maps
-  cdef object user_state_context
+  cpdef public object user_state_context
 
 Review comment:
   Rather than making this public, I would add an `add_timer_info` method to 
this operation. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419202)
Time Spent: 17h 40m  (was: 17.5h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 17h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419190&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419190
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 07:21
Start Date: 09/Apr/20 07:21
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405987932
 
 

 ##
 File path: sdks/python/apache_beam/transforms/core.py
 ##
 @@ -1323,12 +1325,47 @@ def _pardo_fn_data(self):
 windowing = None
 return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
 
-  def to_runner_api_parameter(self, context):
+  def _get_key_and_window_coder(self, named_inputs):
+if named_inputs is None or not self._signature.is_stateful_dofn():
+  return None, None
+main_input = list(set(named_inputs.keys()) - set(self.side_inputs))[0]
+input_pcoll = named_inputs[main_input]
+kv_type_hint = input_pcoll.element_type
+if kv_type_hint and kv_type_hint != typehints.Any:
+  coder = coders.registry.get_coder(kv_type_hint)
+  if not coder.is_kv_coder():
+raise ValueError(
+'Input elements to the transform %s with stateful DoFn must be '
+'key-value pairs.' % self)
+  key_coder = coder.key_coder()
+else:
+  key_coder = coders.registry.get_coder(typehints.Any)
+window_coder = input_pcoll.windowing.windowfn.get_window_coder()
+return key_coder, window_coder
+
+  def to_runner_api(self, context, **extra_kwargs):
+# type: (PipelineContext, bool) -> beam_runner_api_pb2.FunctionSpec
+has_parts = extra_kwargs.get('has_part', False)
+urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs)
 
 Review comment:
   https://engdoc.corp.google.com/eng/doc/devguide/py/totw/026.md?cl=head
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419190)
Time Spent: 16h 50m  (was: 16h 40m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 16h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419199&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419199
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 07:21
Start Date: 09/Apr/20 07:21
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r406002550
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/data_plane.py
 ##
 @@ -408,27 +470,67 @@ def close_callback(data):
 return ClosableOutputStream.create(
 close_callback, add_to_send_queue, self._data_buffer_time_limit_ms)
 
+  def output_timer_stream(self, instruction_id, transform_id, timer_family_id):
+def add_to_send_queue(timer):
+  if timer:
+self._to_send.put(
+beam_fn_api_pb2.Elements.Timer(
+instruction_id=instruction_id,
+transform_id=transform_id,
+timer_family_id=timer_family_id,
+timers=timer,
+is_last=False))
+
+def close_callback(timer):
+  add_to_send_queue(timer)
+  self._to_send.put(
+  beam_fn_api_pb2.Elements.Timer(
+  instruction_id=instruction_id,
+  transform_id=transform_id,
+  timer_family_id=timer_family_id,
+  timers=b'',
+  is_last=True))
+
+return ClosableOutputStream.create(
+close_callback, add_to_send_queue, self._data_buffer_time_limit_ms)
+
   def _write_outputs(self):
 # type: () -> Iterator[beam_fn_api_pb2.Elements]
-done = False
-while not done:
-  data = [self._to_send.get()]
-  try:
-# Coalesce up to 100 other items.
-for _ in range(100):
-  data.append(self._to_send.get_nowait())
-  except queue.Empty:
-pass
-  if data[-1] is self._WRITES_FINISHED:
-done = True
-data.pop()
-  if data:
-yield beam_fn_api_pb2.Elements(data=data)
+stream_done = False
+while not stream_done:
+  streams = None
+  if not stream_done:
 
 Review comment:
   This will always be true (given the loop condition). 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419199)
Time Spent: 17.5h  (was: 17h 20m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 17.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419192&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419192
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 07:21
Start Date: 09/Apr/20 07:21
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405990026
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##
 @@ -629,28 +628,28 @@ def __init__(self,
 self._transform_id = transform_id
 self._key_coder = key_coder
 self._window_coder = window_coder
-self._timer_family_specs = timer_family_specs
-self._timer_receivers = None  # type: Optional[Dict[str, 
operations.ConsumerSet]]
+# A mapping of {timer_family_id: OutputStream}
+self._timer_output_streams = {}
+self._timer_coders_impl = {}
 self._all_states = {
 }  # type: Dict[tuple, userstate.AccumulatingRuntimeState]
 
-  def update_timer_receivers(self, receivers):
-# type: (operations._TaggedReceivers) -> None
-
-"""TODO"""
-self._timer_receivers = {}
-for tag in self._timer_family_specs:
-  self._timer_receivers[tag] = receivers.pop(tag)
+  def add_timer_info(self, timer_family_id, output_stream, coder_impl):
+self._timer_output_streams[timer_family_id] = output_stream
+self._timer_coders_impl[timer_family_id] = coder_impl
 
   def get_timer(
   self,
   timer_spec,
   key,
-  window  # type: windowed_value.BoundedWindow
-  ):
+  window,  # type: windowed_value.BoundedWindow
+  pane):
 # type: (...) -> OutputTimer
-assert self._timer_receivers is not None
-return OutputTimer(key, window, self._timer_receivers[timer_spec.name])
+output_stream = self._timer_output_streams[timer_spec.name]
 
 Review comment:
   If this were a single map rather that two parallel maps, you could write 
something like
   
   `output_tream, timer_coder_impl = self._timer_info(timer_spec.name]`
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419192)
Time Spent: 17h 10m  (was: 17h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 17h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419166&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419166
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 06:55
Start Date: 09/Apr/20 06:55
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on issue #11314: [BEAM-9562] Send 
Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#issuecomment-611363247
 
 
   The 
[test_pardo_timers_clear](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py#L351-L389)
 fails with streaming Flink. The python sdk sends all 
timers(hold_timestamp=-INF with python default behavior) but only gets the 
timer with timestamp=20 back. Given the test only fails when streaming, it 
seems like something not correct with watermark(?). @lukecwik 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419166)
Time Spent: 16.5h  (was: 16h 20m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 16.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419062&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419062
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 01:35
Start Date: 09/Apr/20 01:35
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405907504
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
 ##
 @@ -238,9 +238,14 @@ protected BatchDataflowWorker(
 sdkFusedStage =
 pipeline == null
 ? RegisterNodeFunction.withoutPipeline(
-idGenerator, 
sdkHarnessRegistry.beamFnStateApiServiceDescriptor())
+idGenerator,
+sdkHarnessRegistry.beamFnStateApiServiceDescriptor(),
+sdkHarnessRegistry.beamFnDataApiServiceDescriptor())
 
 Review comment:
   That is correct.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419062)
Time Spent: 16h 20m  (was: 16h 10m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 16h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419043&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419043
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 00:40
Start Date: 09/Apr/20 00:40
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405892978
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##
 @@ -562,45 +562,45 @@ class OutputTimer(object):
   def __init__(self,
key,
window,  # type: windowed_value.BoundedWindow
-   receiver  # type: operations.ConsumerSet
+   paneinfo,
+   timer_family_id,
+   timer_coder_impl,
+   output_stream
   ):
 self._key = key
 self._window = window
-self._receiver = receiver
+self._paneinfo = paneinfo
+self._timer_family_id = timer_family_id
+self._output_stream = output_stream
+self._timer_coder_impl = timer_coder_impl
 
   def set(self, ts):
 ts = timestamp.Timestamp.of(ts)
-# TODO(BEAM-9562): Plumb through actual timer fields.
-self._receiver.receive(
-windowed_value.WindowedValue((
-self._key,
-userstate.Timer(
-user_key='',
-dynamic_timer_tag='',
-windows=(self._window, ),
-clear_bit=False,
-fire_timestamp=ts,
-hold_timestamp=ts,
-paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
- ts, (self._window, )))
+timer = userstate.Timer(
+user_key=self._key,
+dynamic_timer_tag='',
+windows=(self._window, ),
+clear_bit=False,
+fire_timestamp=ts,
+hold_timestamp=ts,
+paneinfo=self._paneinfo)
+self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True)
+self._output_stream.maybe_flush()
 
   def clear(self):
 # type: () -> None
 dummy_millis = int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) + 1
 clear_ts = timestamp.Timestamp(micros=dummy_millis * 1000)
-# TODO(BEAM-9562): Plumb through actual paneinfo.
-self._receiver.receive(
-windowed_value.WindowedValue((
-self._key,
-userstate.Timer(
-user_key='',
-dynamic_timer_tag='',
-windows=(self._window, ),
-clear_bit=False,
-fire_timestamp=timestamp.Timestamp.of(clear_ts),
-hold_timestamp=timestamp.Timestamp.of(0),
-paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
- 0, (self._window, )))
+timer = userstate.Timer(
+user_key=self._key,
+dynamic_timer_tag='',
+windows=(self._window, ),
+clear_bit=False,
+fire_timestamp=clear_ts,
 
 Review comment:
   > (Should the coder be ignoring them as well?)
   
   No, the timer coder is encoding all of these info now.
   
   > Don't bother setting these timestamps, or paneinfo.
   
   Could you please explain more about this? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419043)
Time Spent: 16h 10m  (was: 16h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 16h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419042&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419042
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 00:39
Start Date: 09/Apr/20 00:39
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405892978
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##
 @@ -562,45 +562,45 @@ class OutputTimer(object):
   def __init__(self,
key,
window,  # type: windowed_value.BoundedWindow
-   receiver  # type: operations.ConsumerSet
+   paneinfo,
+   timer_family_id,
+   timer_coder_impl,
+   output_stream
   ):
 self._key = key
 self._window = window
-self._receiver = receiver
+self._paneinfo = paneinfo
+self._timer_family_id = timer_family_id
+self._output_stream = output_stream
+self._timer_coder_impl = timer_coder_impl
 
   def set(self, ts):
 ts = timestamp.Timestamp.of(ts)
-# TODO(BEAM-9562): Plumb through actual timer fields.
-self._receiver.receive(
-windowed_value.WindowedValue((
-self._key,
-userstate.Timer(
-user_key='',
-dynamic_timer_tag='',
-windows=(self._window, ),
-clear_bit=False,
-fire_timestamp=ts,
-hold_timestamp=ts,
-paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
- ts, (self._window, )))
+timer = userstate.Timer(
+user_key=self._key,
+dynamic_timer_tag='',
+windows=(self._window, ),
+clear_bit=False,
+fire_timestamp=ts,
+hold_timestamp=ts,
+paneinfo=self._paneinfo)
+self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True)
+self._output_stream.maybe_flush()
 
   def clear(self):
 # type: () -> None
 dummy_millis = int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) + 1
 clear_ts = timestamp.Timestamp(micros=dummy_millis * 1000)
-# TODO(BEAM-9562): Plumb through actual paneinfo.
-self._receiver.receive(
-windowed_value.WindowedValue((
-self._key,
-userstate.Timer(
-user_key='',
-dynamic_timer_tag='',
-windows=(self._window, ),
-clear_bit=False,
-fire_timestamp=timestamp.Timestamp.of(clear_ts),
-hold_timestamp=timestamp.Timestamp.of(0),
-paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
- 0, (self._window, )))
+timer = userstate.Timer(
+user_key=self._key,
+dynamic_timer_tag='',
+windows=(self._window, ),
+clear_bit=False,
+fire_timestamp=clear_ts,
 
 Review comment:
   > (Should the coder be ignoring them as well?)
   No, the timer coder is encoding all of these info now.
   
   > Don't bother setting these timestamps, or paneinfo.
   Could you please explain more about this? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419042)
Time Spent: 16h  (was: 15h 50m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 16h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419040&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419040
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 00:38
Start Date: 09/Apr/20 00:38
Worklog Time Spent: 10m 
  Work Description: TheNeuralBit commented on pull request #11314: 
[BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405892635
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
 ##
 @@ -134,14 +136,18 @@ public static RegisterNodeFunction forPipeline(
* harnesses, then this method should be removed.
*/
   public static RegisterNodeFunction withoutPipeline(
-  IdGenerator idGenerator, Endpoints.ApiServiceDescriptor 
stateApiServiceDescriptor) {
-return new RegisterNodeFunction(null, idGenerator, 
stateApiServiceDescriptor);
+  IdGenerator idGenerator,
+  Endpoints.ApiServiceDescriptor stateApiServiceDescriptor,
+  Endpoints.ApiServiceDescriptor timerApiServiceDescriptor) {
+return new RegisterNodeFunction(
+null, idGenerator, stateApiServiceDescriptor, 
timerApiServiceDescriptor);
   }
 
   private RegisterNodeFunction(
   @Nullable RunnerApi.Pipeline pipeline,
   IdGenerator idGenerator,
-  Endpoints.ApiServiceDescriptor stateApiServiceDescriptor) {
+  Endpoints.ApiServiceDescriptor stateApiServiceDescriptor,
+  Endpoints.ApiServiceDescriptor timerApiServiceDescriptor) {
 
 Review comment:
   timerApiServiceDescriptor isn't used? Should it be stored and written to the 
ProcessBundleDescrioptor?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419040)
Time Spent: 15h 50m  (was: 15h 40m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 15h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419034&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419034
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 09/Apr/20 00:29
Start Date: 09/Apr/20 00:29
Worklog Time Spent: 10m 
  Work Description: TheNeuralBit commented on pull request #11314: 
[BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405890215
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
 ##
 @@ -238,9 +238,14 @@ protected BatchDataflowWorker(
 sdkFusedStage =
 pipeline == null
 ? RegisterNodeFunction.withoutPipeline(
-idGenerator, 
sdkHarnessRegistry.beamFnStateApiServiceDescriptor())
+idGenerator,
+sdkHarnessRegistry.beamFnStateApiServiceDescriptor(),
+sdkHarnessRegistry.beamFnDataApiServiceDescriptor())
 
 Review comment:
   I see. So we only have a separate timer_api_service_descriptor in the protos 
so that a runner has the option to make it separate, but it doesn't need to be 
separate?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 419034)
Time Spent: 15h 40m  (was: 15.5h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 15h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418986&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418986
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 08/Apr/20 23:32
Start Date: 08/Apr/20 23:32
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405873264
 
 

 ##
 File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
 ##
 @@ -914,6 +923,7 @@ public Object createRunnerForPTransform(
   PipelineOptions pipelineOptions,
   BeamFnDataClient beamFnDataClient,
   BeamFnStateClient beamFnStateClient,
+  BeamFnTimerClient beamFnTimerClient,
   String pTransformId,
   PTransform pTransform,
   Supplier processBundleInstructionId,
 
 Review comment:
   for completeness yes
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 418986)
Time Spent: 15.5h  (was: 15h 20m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 15.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418968&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418968
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 08/Apr/20 23:25
Start Date: 08/Apr/20 23:25
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405870882
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
 ##
 @@ -238,9 +238,14 @@ protected BatchDataflowWorker(
 sdkFusedStage =
 pipeline == null
 ? RegisterNodeFunction.withoutPipeline(
-idGenerator, 
sdkHarnessRegistry.beamFnStateApiServiceDescriptor())
+idGenerator,
+sdkHarnessRegistry.beamFnStateApiServiceDescriptor(),
+sdkHarnessRegistry.beamFnDataApiServiceDescriptor())
 
 Review comment:
   They both use the Data API so no. All were saying here is that we will 
re-use the same gRPC channel for both timers and data.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 418968)
Time Spent: 15h 20m  (was: 15h 10m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 15h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418967&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418967
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 08/Apr/20 23:24
Start Date: 08/Apr/20 23:24
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405870539
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 ##
 @@ -257,6 +230,19 @@ public static ParDoPayload translateParDo(
   restrictionCoderId = "";
 }
 
+Coder windowCoder =
+(Coder) 
mainInput.getWindowingStrategy().getWindowFn().windowCoder();
+Coder keyCoder;
+if (signature.usesState() || signature.usesTimers()) {
+  checkArgument(
+  mainInput.getCoder() instanceof KvCoder,
+  "DoFn's that use state or timers must have an input PCollection with 
a KvCoder but received %s",
+  mainInput.getCoder());
 
 Review comment:
   It was being covered by validation in DoFnSignatures but it is being 
repeated here for defense in depth reasons.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 418967)
Time Spent: 15h 10m  (was: 15h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 15h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418956&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418956
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 08/Apr/20 22:53
Start Date: 08/Apr/20 22:53
Worklog Time Spent: 10m 
  Work Description: TheNeuralBit commented on pull request #11314: 
[BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405823916
 
 

 ##
 File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
 ##
 @@ -914,6 +923,7 @@ public Object createRunnerForPTransform(
   PipelineOptions pipelineOptions,
   BeamFnDataClient beamFnDataClient,
   BeamFnStateClient beamFnStateClient,
+  BeamFnTimerClient beamFnTimerClient,
   String pTransformId,
   PTransform pTransform,
   Supplier processBundleInstructionId,
 
 Review comment:
   This test is `testStateCallsFailIfNoStateApiServiceDescriptorSpecified`. Is 
there value in a `testTimerCallsFailIfNoTimerApiServiceDescriptorSpecified` to 
exercise the new "Timers are unsupported because ..." exception?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 418956)
Time Spent: 15h  (was: 14h 50m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 15h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418955&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418955
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 08/Apr/20 22:53
Start Date: 08/Apr/20 22:53
Worklog Time Spent: 10m 
  Work Description: TheNeuralBit commented on pull request #11314: 
[BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405855696
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
 ##
 @@ -238,9 +238,14 @@ protected BatchDataflowWorker(
 sdkFusedStage =
 pipeline == null
 ? RegisterNodeFunction.withoutPipeline(
-idGenerator, 
sdkHarnessRegistry.beamFnStateApiServiceDescriptor())
+idGenerator,
+sdkHarnessRegistry.beamFnStateApiServiceDescriptor(),
+sdkHarnessRegistry.beamFnDataApiServiceDescriptor())
 
 Review comment:
   Isn't the timer API service descriptor different from the data API service 
descriptor? Does that need to be plumbed through SdkHarnessRegistry and used 
here instead of the data API descriptor? (same question below and in streaming 
worker)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 418955)
Time Spent: 15h  (was: 14h 50m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 15h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418954&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418954
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 08/Apr/20 22:53
Start Date: 08/Apr/20 22:53
Worklog Time Spent: 10m 
  Work Description: TheNeuralBit commented on pull request #11314: 
[BEAM-9562] Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405831920
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 ##
 @@ -257,6 +230,19 @@ public static ParDoPayload translateParDo(
   restrictionCoderId = "";
 }
 
+Coder windowCoder =
+(Coder) 
mainInput.getWindowingStrategy().getWindowFn().windowCoder();
+Coder keyCoder;
+if (signature.usesState() || signature.usesTimers()) {
+  checkArgument(
+  mainInput.getCoder() instanceof KvCoder,
+  "DoFn's that use state or timers must have an input PCollection with 
a KvCoder but received %s",
+  mainInput.getCoder());
 
 Review comment:
   Just curious: did we not have this check before, and just failed when 
attempting to cast to KVCoder  (in the removed block from `translate` above)?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 418954)
Time Spent: 15h  (was: 14h 50m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 15h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418950&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418950
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 08/Apr/20 22:35
Start Date: 08/Apr/20 22:35
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #11314: [BEAM-9562] Send 
Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#issuecomment-611229498
 
 
   I'm sorry. I am havinhg a heavy headache. I'll bow out. @robertwb can you 
review fn_runner.py and siblings?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 418950)
Time Spent: 14h 50m  (was: 14h 40m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 14h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418907&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418907
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 08/Apr/20 21:33
Start Date: 08/Apr/20 21:33
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405815160
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##
 @@ -562,45 +562,45 @@ class OutputTimer(object):
   def __init__(self,
key,
window,  # type: windowed_value.BoundedWindow
-   receiver  # type: operations.ConsumerSet
+   paneinfo,
+   timer_family_id,
+   timer_coder_impl,
+   output_stream
 
 Review comment:
   A type on this parameter would be useful.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 418907)
Time Spent: 14h  (was: 13h 50m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 14h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418908&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418908
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 08/Apr/20 21:33
Start Date: 08/Apr/20 21:33
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405817899
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##
 @@ -611,7 +611,7 @@ def __init__(self,
transform_id,  # type: str
key_coder,  # type: coders.Coder
window_coder,  # type: coders.Coder
-   timer_family_specs  # type: Mapping[str, 
beam_runner_api_pb2.TimerFamilySpec]
+   timer_coders
 
 Review comment:
   type?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 418908)
Time Spent: 14h 10m  (was: 14h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 14h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418913&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418913
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 08/Apr/20 21:33
Start Date: 08/Apr/20 21:33
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405825392
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##
 @@ -846,6 +870,30 @@ def process_bundle(self, instruction_id):
 data_channels[input_op.data_channel].append(input_op.transform_id)
 input_op_by_transform_id[input_op.transform_id] = input_op
 
+# Set up timer output stream
+  timer_output_streams = {}
+  for transform_id, timer_list in self.timer_ids.items():
+output_streams = {}
+for timer_id in timer_list:
+  output_streams[
+  timer_id] = self.timer_data_channel.output_timer_stream(
+  instruction_id, transform_id, timer_id)
+  timer_output_streams[transform_id] = output_streams
+self.process_timer_ops[
+transform_id].user_state_context.update_timer_output_streams(
 
 Review comment:
   Nit: rather than this double nesting, it might simplify things to have an 
`update_timer_output_streams(timer_id, output_stream)` method that could be 
called repeatedly. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 418913)
Time Spent: 14h 40m  (was: 14.5h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 14h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418909&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418909
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 08/Apr/20 21:33
Start Date: 08/Apr/20 21:33
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405817317
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##
 @@ -562,45 +562,45 @@ class OutputTimer(object):
   def __init__(self,
key,
window,  # type: windowed_value.BoundedWindow
-   receiver  # type: operations.ConsumerSet
+   paneinfo,
+   timer_family_id,
+   timer_coder_impl,
+   output_stream
   ):
 self._key = key
 self._window = window
-self._receiver = receiver
+self._paneinfo = paneinfo
+self._timer_family_id = timer_family_id
+self._output_stream = output_stream
+self._timer_coder_impl = timer_coder_impl
 
   def set(self, ts):
 ts = timestamp.Timestamp.of(ts)
-# TODO(BEAM-9562): Plumb through actual timer fields.
-self._receiver.receive(
-windowed_value.WindowedValue((
-self._key,
-userstate.Timer(
-user_key='',
-dynamic_timer_tag='',
-windows=(self._window, ),
-clear_bit=False,
-fire_timestamp=ts,
-hold_timestamp=ts,
-paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
- ts, (self._window, )))
+timer = userstate.Timer(
+user_key=self._key,
+dynamic_timer_tag='',
+windows=(self._window, ),
+clear_bit=False,
+fire_timestamp=ts,
+hold_timestamp=ts,
+paneinfo=self._paneinfo)
+self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True)
+self._output_stream.maybe_flush()
 
   def clear(self):
 # type: () -> None
 dummy_millis = int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) + 1
 clear_ts = timestamp.Timestamp(micros=dummy_millis * 1000)
-# TODO(BEAM-9562): Plumb through actual paneinfo.
-self._receiver.receive(
-windowed_value.WindowedValue((
-self._key,
-userstate.Timer(
-user_key='',
-dynamic_timer_tag='',
-windows=(self._window, ),
-clear_bit=False,
-fire_timestamp=timestamp.Timestamp.of(clear_ts),
-hold_timestamp=timestamp.Timestamp.of(0),
-paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
- 0, (self._window, )))
+timer = userstate.Timer(
+user_key=self._key,
+dynamic_timer_tag='',
+windows=(self._window, ),
+clear_bit=False,
+fire_timestamp=clear_ts,
 
 Review comment:
   (Should the coder be ignoring them as well?)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 418909)
Time Spent: 14h 10m  (was: 14h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 14h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418912&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418912
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 08/Apr/20 21:33
Start Date: 08/Apr/20 21:33
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405826837
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##
 @@ -846,6 +870,30 @@ def process_bundle(self, instruction_id):
 data_channels[input_op.data_channel].append(input_op.transform_id)
 input_op_by_transform_id[input_op.transform_id] = input_op
 
+# Set up timer output stream
+  timer_output_streams = {}
+  for transform_id, timer_list in self.timer_ids.items():
+output_streams = {}
+for timer_id in timer_list:
+  output_streams[
+  timer_id] = self.timer_data_channel.output_timer_stream(
+  instruction_id, transform_id, timer_id)
+  timer_output_streams[transform_id] = output_streams
+self.process_timer_ops[
+transform_id].user_state_context.update_timer_output_streams(
+output_streams)
+
+  # Process timers
+  if self.timer_data_channel:
 
 Review comment:
   We can't safely assume the runner will finish sending all timers before 
sending any of the data (and the buffer may get full, resulting in a deadlock). 
I think we need to have a data_channel.inputs() that returns both data and 
timers and then branch in the loop. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 418912)
Time Spent: 14.5h  (was: 14h 20m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 14.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418910&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418910
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 08/Apr/20 21:33
Start Date: 08/Apr/20 21:33
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405816460
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##
 @@ -562,45 +562,45 @@ class OutputTimer(object):
   def __init__(self,
key,
window,  # type: windowed_value.BoundedWindow
-   receiver  # type: operations.ConsumerSet
+   paneinfo,
+   timer_family_id,
+   timer_coder_impl,
+   output_stream
   ):
 self._key = key
 self._window = window
-self._receiver = receiver
+self._paneinfo = paneinfo
+self._timer_family_id = timer_family_id
+self._output_stream = output_stream
+self._timer_coder_impl = timer_coder_impl
 
   def set(self, ts):
 ts = timestamp.Timestamp.of(ts)
-# TODO(BEAM-9562): Plumb through actual timer fields.
-self._receiver.receive(
-windowed_value.WindowedValue((
-self._key,
-userstate.Timer(
-user_key='',
-dynamic_timer_tag='',
-windows=(self._window, ),
-clear_bit=False,
-fire_timestamp=ts,
-hold_timestamp=ts,
-paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
- ts, (self._window, )))
+timer = userstate.Timer(
+user_key=self._key,
+dynamic_timer_tag='',
+windows=(self._window, ),
+clear_bit=False,
+fire_timestamp=ts,
+hold_timestamp=ts,
+paneinfo=self._paneinfo)
+self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True)
+self._output_stream.maybe_flush()
 
   def clear(self):
 # type: () -> None
 dummy_millis = int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) + 1
 clear_ts = timestamp.Timestamp(micros=dummy_millis * 1000)
-# TODO(BEAM-9562): Plumb through actual paneinfo.
-self._receiver.receive(
-windowed_value.WindowedValue((
-self._key,
-userstate.Timer(
-user_key='',
-dynamic_timer_tag='',
-windows=(self._window, ),
-clear_bit=False,
-fire_timestamp=timestamp.Timestamp.of(clear_ts),
-hold_timestamp=timestamp.Timestamp.of(0),
-paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
- 0, (self._window, )))
+timer = userstate.Timer(
+user_key=self._key,
+dynamic_timer_tag='',
+windows=(self._window, ),
+clear_bit=False,
+fire_timestamp=clear_ts,
 
 Review comment:
   Don't bother setting these timestamps, or paneinfo.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 418910)
Time Spent: 14h 20m  (was: 14h 10m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 14h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418911&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418911
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 08/Apr/20 21:33
Start Date: 08/Apr/20 21:33
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11314: [BEAM-9562] 
Send Timers over Data Channel as Elements
URL: https://github.com/apache/beam/pull/11314#discussion_r405824290
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
 ##
 @@ -1088,6 +1142,30 @@ def create_operation(self,
 transform_proto.spec.payload, parameter_type)
 return creator(self, transform_id, transform_proto, payload, consumers)
 
+  def get_timer_coders(self):
+timer_coder = {}
+for transform_id, transform_proto in self.descriptor.transforms.items():
 
 Review comment:
   I see us doing this loop three times now. Perhaps it would be more useful to 
do the loop once to set everything up, creating a single dictionary 
(transform_id, timer_family_id) -> (all info about that timer we need to 
dispatch them).
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 418911)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-harness, sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
> Fix For: 2.21.0
>
>  Time Spent: 14h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415580&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415580
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 03/Apr/20 16:33
Start Date: 03/Apr/20 16:33
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on issue #11199: [BEAM-9562] Update 
Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#issuecomment-608539461
 
 
   All tests passed. i'm going to merge it. Thanks, everyone!
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 415580)
Time Spent: 13h 40m  (was: 13.5h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 13h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415582&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415582
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 03/Apr/20 16:33
Start Date: 03/Apr/20 16:33
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #11199: [BEAM-9562] 
Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 415582)
Time Spent: 13h 50m  (was: 13h 40m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 13h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415558&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415558
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 03/Apr/20 15:48
Start Date: 03/Apr/20 15:48
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: [BEAM-9562] 
Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r403099609
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##
 @@ -40,92 +52,201 @@
 @AutoValue
 public abstract class Timer {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer of(Instant time) {
-return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code 
dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static  Timer of(
+  T userKey,
+  String dynamicTimerTag,
+  Collection windows,
+  Instant fireTimestamp,
+  Instant holdTimestamp,
+  PaneInfo pane) {
+return new AutoValue_Timer(
+userKey, dynamicTimerTag, windows, false, fireTimestamp, 
holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static  Timer of(Instant timestamp, @Nullable T payload) {
-return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag and 
windows. */
+  public static  Timer cleared(
+  T userKey, String dynamicTimerTag, Collection 
windows) {
+return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, 
null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is {@code ""} when the 
timer is for a {@link
+   * TimerSpec}.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns the windows which is associated with the timer. */
+  public abstract Collection getWindows();
+
+  /** Returns whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field 
is nullable only when
+   * the timer is being cleared.
*
-   * The time is relative to the time domain defined in the {@link
+   * The time is absolute to the time domain defined in the {@link
* org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is 
associated with this
* timer.
*/
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field 
is nullable only when
+   * the timer is being cleared.
+   */
   @Nullable
-  public abstract T getPayload();
+  public abstract Instant getHoldTimestamp();
+
+  /**
+   * Returns the paneinfo that is related to the timer. This field is nullable 
only when the
+   * timer is being cleared.
+   */
+  @Nullable
+  public abstract PaneInfo getPane();
+
+  @Override
+  public boolean equals(Object other) {
+if (!(other instanceof Timer)) {
+  return false;
+}
+Timer that = (Timer) other;
+if (this.getClearBit()) {
+  return Objects.equals(this.getUserKey(), that.getUserKey())
+  && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+  && (this.getClearBit() == that.getClearBit())
+  && Objects.equals(this.getWindows(), that.getWindows());
+}
+return Objects.equals(this.getUserKey(), that.getUserKey())
+&& this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+&& (this.getClearBit() == that.getClearBit())
+&& this.getFireTimestamp().equals(that.getFireTimestamp())
+&& this.getHoldTimestamp().equals(that.getHoldTimestamp())
+&& Objects.equals(this.getWindows(), that.getWindows())
+&& Objects.equals(this.getPane(), that.getPane());
+  }
+
+  @Override
+  public int hashCode() {
+// Hash only the millis of the timestamp to be consistent with equals
+if (getClearBit()) {
+  return Objects.hash(getUserKey(), getDynamicTimerTag(), getClearBit(), 
getWindows());
+}
+return Objects.hash(
+getUserKey(),
+getDynamicTimerTag(),
+getClearBit(),
+getFireTimestamp().getMillis(),
+getHoldTimestamp().getMillis(),
+getWindows(),
+getPane());
 
 Review comment:
   good point
 

This is 

[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415322&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415322
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 03/Apr/20 05:44
Start Date: 03/Apr/20 05:44
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on issue #11199: [BEAM-9562] Update 
Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#issuecomment-608241577
 
 
   Run Java_Examples_Dataflow PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 415322)
Time Spent: 13h 20m  (was: 13h 10m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 13h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415313&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415313
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 03/Apr/20 04:57
Start Date: 03/Apr/20 04:57
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on issue #11199: [BEAM-9562] Update 
Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#issuecomment-608229553
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 415313)
Time Spent: 13h 10m  (was: 13h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 13h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415298&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415298
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 03/Apr/20 03:13
Start Date: 03/Apr/20 03:13
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #11199: [BEAM-9562] 
Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402713697
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##
 @@ -40,92 +52,201 @@
 @AutoValue
 public abstract class Timer {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer of(Instant time) {
-return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code 
dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static  Timer of(
+  T userKey,
+  String dynamicTimerTag,
+  Collection windows,
+  Instant fireTimestamp,
+  Instant holdTimestamp,
+  PaneInfo pane) {
+return new AutoValue_Timer(
+userKey, dynamicTimerTag, windows, false, fireTimestamp, 
holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static  Timer of(Instant timestamp, @Nullable T payload) {
-return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag and 
windows. */
+  public static  Timer cleared(
+  T userKey, String dynamicTimerTag, Collection 
windows) {
+return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, 
null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is {@code ""} when the 
timer is for a {@link
+   * TimerSpec}.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns the windows which is associated with the timer. */
+  public abstract Collection getWindows();
+
+  /** Returns whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field 
is nullable only when
+   * the timer is being cleared.
*
-   * The time is relative to the time domain defined in the {@link
+   * The time is absolute to the time domain defined in the {@link
* org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is 
associated with this
* timer.
*/
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field 
is nullable only when
+   * the timer is being cleared.
+   */
   @Nullable
-  public abstract T getPayload();
+  public abstract Instant getHoldTimestamp();
+
+  /**
+   * Returns the paneinfo that is related to the timer. This field is nullable 
only when the
+   * timer is being cleared.
+   */
+  @Nullable
+  public abstract PaneInfo getPane();
+
+  @Override
+  public boolean equals(Object other) {
+if (!(other instanceof Timer)) {
+  return false;
+}
+Timer that = (Timer) other;
+if (this.getClearBit()) {
+  return Objects.equals(this.getUserKey(), that.getUserKey())
+  && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+  && (this.getClearBit() == that.getClearBit())
+  && Objects.equals(this.getWindows(), that.getWindows());
+}
+return Objects.equals(this.getUserKey(), that.getUserKey())
+&& this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+&& (this.getClearBit() == that.getClearBit())
+&& this.getFireTimestamp().equals(that.getFireTimestamp())
+&& this.getHoldTimestamp().equals(that.getHoldTimestamp())
+&& Objects.equals(this.getWindows(), that.getWindows())
+&& Objects.equals(this.getPane(), that.getPane());
+  }
+
+  @Override
+  public int hashCode() {
+// Hash only the millis of the timestamp to be consistent with equals
+if (getClearBit()) {
+  return Objects.hash(getUserKey(), getDynamicTimerTag(), getClearBit(), 
getWindows());
+}
+return Objects.hash(
+getUserKey(),
+getDynamicTimerTag(),
+getClearBit(),
+getFireTimestamp().getMillis(),
+getHoldTimestamp().getMillis(),
+getWindows(),
+getPane());
 
 Review comment:
   `getFireTimestamp()` returns `null` when `clearBit` is `true`. I think we 
still need 

[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415274&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415274
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 03/Apr/20 01:55
Start Date: 03/Apr/20 01:55
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: [BEAM-9562] 
Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402692210
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##
 @@ -40,92 +52,201 @@
 @AutoValue
 public abstract class Timer {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer of(Instant time) {
-return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code 
dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static  Timer of(
+  T userKey,
+  String dynamicTimerTag,
+  Collection windows,
+  Instant fireTimestamp,
+  Instant holdTimestamp,
+  PaneInfo pane) {
+return new AutoValue_Timer(
+userKey, dynamicTimerTag, windows, false, fireTimestamp, 
holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static  Timer of(Instant timestamp, @Nullable T payload) {
-return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag and 
windows. */
+  public static  Timer cleared(
+  T userKey, String dynamicTimerTag, Collection 
windows) {
+return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, 
null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is {@code ""} when the 
timer is for a {@link
+   * TimerSpec}.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns the windows which is associated with the timer. */
 
 Review comment:
   ```suggestion
 /** Returns the windows which are associated with the timer. */
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 415274)
Time Spent: 12h 50m  (was: 12h 40m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 12h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415267&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415267
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 03/Apr/20 01:55
Start Date: 03/Apr/20 01:55
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: [BEAM-9562] 
Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402688089
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##
 @@ -40,92 +52,201 @@
 @AutoValue
 public abstract class Timer {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer of(Instant time) {
-return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code 
dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static  Timer of(
+  T userKey,
+  String dynamicTimerTag,
+  Collection windows,
+  Instant fireTimestamp,
+  Instant holdTimestamp,
+  PaneInfo pane) {
+return new AutoValue_Timer(
+userKey, dynamicTimerTag, windows, false, fireTimestamp, 
holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static  Timer of(Instant timestamp, @Nullable T payload) {
-return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag and 
windows. */
+  public static  Timer cleared(
+  T userKey, String dynamicTimerTag, Collection 
windows) {
+return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, 
null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is {@code ""} when the 
timer is for a {@link
+   * TimerSpec}.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns the windows which is associated with the timer. */
+  public abstract Collection getWindows();
+
+  /** Returns whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field 
is nullable only when
+   * the timer is being cleared.
*
-   * The time is relative to the time domain defined in the {@link
+   * The time is absolute to the time domain defined in the {@link
* org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is 
associated with this
* timer.
*/
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field 
is nullable only when
+   * the timer is being cleared.
+   */
   @Nullable
-  public abstract T getPayload();
+  public abstract Instant getHoldTimestamp();
+
+  /**
+   * Returns the paneinfo that is related to the timer. This field is nullable 
only when the
+   * timer is being cleared.
 
 Review comment:
   ```suggestion
  * Returns the {@link PaneInfo} that is related to the timer. This field 
is nullable only when the
  * timer is being cleared.
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 415267)
Time Spent: 12h 10m  (was: 12h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 12h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415265&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415265
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 03/Apr/20 01:55
Start Date: 03/Apr/20 01:55
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: [BEAM-9562] 
Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402687712
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##
 @@ -21,17 +21,29 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.BooleanCoder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CollectionCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.joda.time.Instant;
 
 /**
- * A timer consists of a timestamp and a corresponding user supplied payload.
+ * A timer consists of a user key, a dynamic timer tag and either a bit that 
says that this timer
+ * should be cleared or data representing the firing timestamp, hold timestamp 
and a list of windows
+ * and pane information that should be used when producing output.
 
 Review comment:
   ```suggestion
* A timer consists of a user key, a dynamic timer tag, a set of windows and 
either a bit that says that this timer
* should be cleared or data representing the firing timestamp, hold 
timestamp and pane information that should be used when producing output.
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 415265)
Time Spent: 12h  (was: 11h 50m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 12h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415271&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415271
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 03/Apr/20 01:55
Start Date: 03/Apr/20 01:55
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: [BEAM-9562] 
Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402689183
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##
 @@ -40,92 +52,201 @@
 @AutoValue
 public abstract class Timer {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer of(Instant time) {
-return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code 
dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static  Timer of(
+  T userKey,
+  String dynamicTimerTag,
+  Collection windows,
+  Instant fireTimestamp,
+  Instant holdTimestamp,
+  PaneInfo pane) {
+return new AutoValue_Timer(
+userKey, dynamicTimerTag, windows, false, fireTimestamp, 
holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static  Timer of(Instant timestamp, @Nullable T payload) {
-return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag and 
windows. */
+  public static  Timer cleared(
+  T userKey, String dynamicTimerTag, Collection 
windows) {
+return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, 
null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is {@code ""} when the 
timer is for a {@link
+   * TimerSpec}.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns the windows which is associated with the timer. */
+  public abstract Collection getWindows();
+
+  /** Returns whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field 
is nullable only when
+   * the timer is being cleared.
*
-   * The time is relative to the time domain defined in the {@link
+   * The time is absolute to the time domain defined in the {@link
* org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is 
associated with this
* timer.
*/
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field 
is nullable only when
+   * the timer is being cleared.
+   */
   @Nullable
-  public abstract T getPayload();
+  public abstract Instant getHoldTimestamp();
+
+  /**
+   * Returns the paneinfo that is related to the timer. This field is nullable 
only when the
+   * timer is being cleared.
+   */
+  @Nullable
+  public abstract PaneInfo getPane();
+
+  @Override
+  public boolean equals(Object other) {
+if (!(other instanceof Timer)) {
+  return false;
+}
+Timer that = (Timer) other;
+if (this.getClearBit()) {
+  return Objects.equals(this.getUserKey(), that.getUserKey())
+  && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+  && (this.getClearBit() == that.getClearBit())
+  && Objects.equals(this.getWindows(), that.getWindows());
+}
+return Objects.equals(this.getUserKey(), that.getUserKey())
+&& this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+&& (this.getClearBit() == that.getClearBit())
+&& this.getFireTimestamp().equals(that.getFireTimestamp())
+&& this.getHoldTimestamp().equals(that.getHoldTimestamp())
+&& Objects.equals(this.getWindows(), that.getWindows())
+&& Objects.equals(this.getPane(), that.getPane());
+  }
+
+  @Override
+  public int hashCode() {
+// Hash only the millis of the timestamp to be consistent with equals
+if (getClearBit()) {
+  return Objects.hash(getUserKey(), getDynamicTimerTag(), getClearBit(), 
getWindows());
+}
+return Objects.hash(
+getUserKey(),
+getDynamicTimerTag(),
+getClearBit(),
+getFireTimestamp().getMillis(),
+getHoldTimestamp().getMillis(),
+getWindows(),
+getPane());
+  }
 
   /**
* A {@link org.apache.beam.sdk.coders.Coder} for timers.
*
-   * This coder is determi

[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415272&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415272
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 03/Apr/20 01:55
Start Date: 03/Apr/20 01:55
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: [BEAM-9562] 
Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402688977
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##
 @@ -40,92 +52,201 @@
 @AutoValue
 public abstract class Timer {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer of(Instant time) {
-return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code 
dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static  Timer of(
+  T userKey,
+  String dynamicTimerTag,
+  Collection windows,
+  Instant fireTimestamp,
+  Instant holdTimestamp,
+  PaneInfo pane) {
+return new AutoValue_Timer(
+userKey, dynamicTimerTag, windows, false, fireTimestamp, 
holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static  Timer of(Instant timestamp, @Nullable T payload) {
-return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag and 
windows. */
+  public static  Timer cleared(
+  T userKey, String dynamicTimerTag, Collection 
windows) {
+return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, 
null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is {@code ""} when the 
timer is for a {@link
+   * TimerSpec}.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns the windows which is associated with the timer. */
+  public abstract Collection getWindows();
+
+  /** Returns whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field 
is nullable only when
+   * the timer is being cleared.
*
-   * The time is relative to the time domain defined in the {@link
+   * The time is absolute to the time domain defined in the {@link
* org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is 
associated with this
* timer.
*/
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field 
is nullable only when
+   * the timer is being cleared.
+   */
   @Nullable
-  public abstract T getPayload();
+  public abstract Instant getHoldTimestamp();
+
+  /**
+   * Returns the paneinfo that is related to the timer. This field is nullable 
only when the
+   * timer is being cleared.
+   */
+  @Nullable
+  public abstract PaneInfo getPane();
+
+  @Override
+  public boolean equals(Object other) {
+if (!(other instanceof Timer)) {
+  return false;
+}
+Timer that = (Timer) other;
+if (this.getClearBit()) {
+  return Objects.equals(this.getUserKey(), that.getUserKey())
+  && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+  && (this.getClearBit() == that.getClearBit())
+  && Objects.equals(this.getWindows(), that.getWindows());
+}
+return Objects.equals(this.getUserKey(), that.getUserKey())
+&& this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+&& (this.getClearBit() == that.getClearBit())
+&& this.getFireTimestamp().equals(that.getFireTimestamp())
+&& this.getHoldTimestamp().equals(that.getHoldTimestamp())
+&& Objects.equals(this.getWindows(), that.getWindows())
+&& Objects.equals(this.getPane(), that.getPane());
+  }
+
+  @Override
+  public int hashCode() {
+// Hash only the millis of the timestamp to be consistent with equals
+if (getClearBit()) {
+  return Objects.hash(getUserKey(), getDynamicTimerTag(), getClearBit(), 
getWindows());
+}
 
 Review comment:
   The hash is still stable will null objects.
   ```suggestion
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about

[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415269&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415269
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 03/Apr/20 01:55
Start Date: 03/Apr/20 01:55
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: [BEAM-9562] 
Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402688769
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##
 @@ -40,92 +52,201 @@
 @AutoValue
 public abstract class Timer {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer of(Instant time) {
-return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code 
dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static  Timer of(
+  T userKey,
+  String dynamicTimerTag,
+  Collection windows,
+  Instant fireTimestamp,
+  Instant holdTimestamp,
+  PaneInfo pane) {
+return new AutoValue_Timer(
+userKey, dynamicTimerTag, windows, false, fireTimestamp, 
holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static  Timer of(Instant timestamp, @Nullable T payload) {
-return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag and 
windows. */
+  public static  Timer cleared(
+  T userKey, String dynamicTimerTag, Collection 
windows) {
+return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, 
null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is {@code ""} when the 
timer is for a {@link
+   * TimerSpec}.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns the windows which is associated with the timer. */
+  public abstract Collection getWindows();
+
+  /** Returns whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field 
is nullable only when
+   * the timer is being cleared.
*
-   * The time is relative to the time domain defined in the {@link
+   * The time is absolute to the time domain defined in the {@link
* org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is 
associated with this
* timer.
*/
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field 
is nullable only when
+   * the timer is being cleared.
+   */
   @Nullable
-  public abstract T getPayload();
+  public abstract Instant getHoldTimestamp();
+
+  /**
+   * Returns the paneinfo that is related to the timer. This field is nullable 
only when the
+   * timer is being cleared.
+   */
+  @Nullable
+  public abstract PaneInfo getPane();
+
+  @Override
+  public boolean equals(Object other) {
+if (!(other instanceof Timer)) {
+  return false;
+}
+Timer that = (Timer) other;
+if (this.getClearBit()) {
+  return Objects.equals(this.getUserKey(), that.getUserKey())
+  && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+  && (this.getClearBit() == that.getClearBit())
+  && Objects.equals(this.getWindows(), that.getWindows());
+}
+return Objects.equals(this.getUserKey(), that.getUserKey())
+&& this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+&& (this.getClearBit() == that.getClearBit())
+&& this.getFireTimestamp().equals(that.getFireTimestamp())
+&& this.getHoldTimestamp().equals(that.getHoldTimestamp())
+&& Objects.equals(this.getWindows(), that.getWindows())
+&& Objects.equals(this.getPane(), that.getPane());
+  }
 
 Review comment:
   This will make the method less error prone in case the contract changes.
   ```suggestion
   return Objects.equals(this.getUserKey(), that.getUserKey())
   && Objects.equals(this.getDynamicTimerTag(), 
that.getDynamicTimerTag())
   && Objects.equals(this.getWindows(), that.getWindows())
   && (this.getClearBit() == that.getClearBit())
   && Objects.equals(this.getFireTimestamp(), that.getFireTimestamp())
   && Objects.equals(this.getHoldTimestamp(), that.getHoldTimestamp())
   && Objects.equals(this.get

[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415273&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415273
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 03/Apr/20 01:55
Start Date: 03/Apr/20 01:55
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: [BEAM-9562] 
Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402692551
 
 

 ##
 File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TimerTest.java
 ##
 @@ -33,42 +35,128 @@
 /** Tests for {@link Timer}. */
 @RunWith(JUnit4.class)
 public class TimerTest {
-  private static final Instant INSTANT = Instant.now();
+  private static final Instant FIREINSTANT = new Instant(123L);
+  private static final Instant HOLDINSTANT = new Instant(456L);
 
 Review comment:
   ```suggestion
 private static final Instant FIRE_TIME = new Instant(123L);
 private static final Instant HOLD_TIME = new Instant(456L);
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 415273)
Time Spent: 12h 40m  (was: 12.5h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 12h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415268&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415268
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 03/Apr/20 01:55
Start Date: 03/Apr/20 01:55
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: [BEAM-9562] 
Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402687822
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##
 @@ -40,92 +52,201 @@
 @AutoValue
 public abstract class Timer {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer of(Instant time) {
-return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code 
dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static  Timer of(
+  T userKey,
+  String dynamicTimerTag,
+  Collection windows,
+  Instant fireTimestamp,
+  Instant holdTimestamp,
+  PaneInfo pane) {
+return new AutoValue_Timer(
+userKey, dynamicTimerTag, windows, false, fireTimestamp, 
holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static  Timer of(Instant timestamp, @Nullable T payload) {
-return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag and 
windows. */
+  public static  Timer cleared(
+  T userKey, String dynamicTimerTag, Collection 
windows) {
+return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, 
null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is {@code ""} when the 
timer is for a {@link
+   * TimerSpec}.
 
 Review comment:
   ```suggestion
  * Returns the tag that the timer is set on. The tag is {@code ""} when 
the timer is for a {@link
  * TimerSpec}.
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 415268)
Time Spent: 12h 20m  (was: 12h 10m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 12h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415266&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415266
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 03/Apr/20 01:55
Start Date: 03/Apr/20 01:55
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: [BEAM-9562] 
Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402687607
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##
 @@ -40,92 +52,201 @@
 @AutoValue
 public abstract class Timer {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer of(Instant time) {
-return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code 
dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static  Timer of(
+  T userKey,
+  String dynamicTimerTag,
+  Collection windows,
+  Instant fireTimestamp,
+  Instant holdTimestamp,
+  PaneInfo pane) {
+return new AutoValue_Timer(
+userKey, dynamicTimerTag, windows, false, fireTimestamp, 
holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static  Timer of(Instant timestamp, @Nullable T payload) {
-return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag and 
windows. */
 
 Review comment:
   ```suggestion
 /** Returns a cleared timer for the given {@code userKey}, {@code 
dynamicTimerTag} and {@code windows}. */
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 415266)
Time Spent: 12h  (was: 11h 50m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 12h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415270&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415270
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 03/Apr/20 01:55
Start Date: 03/Apr/20 01:55
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: [BEAM-9562] 
Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402689030
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##
 @@ -40,92 +52,201 @@
 @AutoValue
 public abstract class Timer {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer of(Instant time) {
-return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code 
dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static  Timer of(
+  T userKey,
+  String dynamicTimerTag,
+  Collection windows,
+  Instant fireTimestamp,
+  Instant holdTimestamp,
+  PaneInfo pane) {
+return new AutoValue_Timer(
+userKey, dynamicTimerTag, windows, false, fireTimestamp, 
holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static  Timer of(Instant timestamp, @Nullable T payload) {
-return new AutoValue_Timer(timestamp, payload);
+  /** Returns a cleared timer for the given userKey, dynamicTimerTag and 
windows. */
+  public static  Timer cleared(
+  T userKey, String dynamicTimerTag, Collection 
windows) {
+return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, 
null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
+  /**
+   * Returns the tag that the timer is set on. This tag is {@code ""} when the 
timer is for a {@link
+   * TimerSpec}.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns the windows which is associated with the timer. */
+  public abstract Collection getWindows();
+
+  /** Returns whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the timestamp of when the timer is scheduled to fire. This field 
is nullable only when
+   * the timer is being cleared.
*
-   * The time is relative to the time domain defined in the {@link
+   * The time is absolute to the time domain defined in the {@link
* org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is 
associated with this
* timer.
*/
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field 
is nullable only when
+   * the timer is being cleared.
+   */
   @Nullable
-  public abstract T getPayload();
+  public abstract Instant getHoldTimestamp();
+
+  /**
+   * Returns the paneinfo that is related to the timer. This field is nullable 
only when the
+   * timer is being cleared.
+   */
+  @Nullable
+  public abstract PaneInfo getPane();
+
+  @Override
+  public boolean equals(Object other) {
+if (!(other instanceof Timer)) {
+  return false;
+}
+Timer that = (Timer) other;
+if (this.getClearBit()) {
+  return Objects.equals(this.getUserKey(), that.getUserKey())
+  && this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+  && (this.getClearBit() == that.getClearBit())
+  && Objects.equals(this.getWindows(), that.getWindows());
+}
+return Objects.equals(this.getUserKey(), that.getUserKey())
+&& this.getDynamicTimerTag().equals(that.getDynamicTimerTag())
+&& (this.getClearBit() == that.getClearBit())
+&& this.getFireTimestamp().equals(that.getFireTimestamp())
+&& this.getHoldTimestamp().equals(that.getHoldTimestamp())
+&& Objects.equals(this.getWindows(), that.getWindows())
+&& Objects.equals(this.getPane(), that.getPane());
+  }
+
+  @Override
+  public int hashCode() {
+// Hash only the millis of the timestamp to be consistent with equals
+if (getClearBit()) {
+  return Objects.hash(getUserKey(), getDynamicTimerTag(), getClearBit(), 
getWindows());
+}
+return Objects.hash(
+getUserKey(),
+getDynamicTimerTag(),
+getClearBit(),
+getFireTimestamp().getMillis(),
+getHoldTimestamp().getMillis(),
+getWindows(),
+getPane());
 
 Review comment:
   ```suggestion
   return Objects.hash(
   getUserKey(),
   getDynam

[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415253&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415253
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 03/Apr/20 01:22
Start Date: 03/Apr/20 01:22
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: [BEAM-9562] 
Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r402685419
 
 

 ##
 File path: 
model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
 ##
 @@ -28,6 +28,23 @@
 # one of a few standard JSON types such as numbers, strings, dicts 
that map naturally
 # to the type encoded by the coder.
 #
+# Java code snippet to generate example bytes:
+#   Coder> coder = Timer.Coder.of(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE);
+#   Instant now = new Instant(1000L);
+#   Timer timer = Timer.of(
+# "key",
+# "tag",
+# now,
+# now,
+# Collections.singletonList(GlobalWindow.INSTANCE),
+# PaneInfo.NO_FIRING);
+#   byte[] byets = CoderUtils.encodeToByteArray(coder, timer);
+#   String str = new String(byets, 
java.nio.charset.StandardCharsets.ISO_8859_1);
 
 Review comment:
   sg
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 415253)
Time Spent: 11h 50m  (was: 11h 40m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 11h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415007&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415007
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 02/Apr/20 19:45
Start Date: 02/Apr/20 19:45
Worklog Time Spent: 10m 
  Work Description: lostluck commented on issue #11297: [BEAM-9562] Update 
missed TimerSpec conversion in Go SDK
URL: https://github.com/apache/beam/pull/11297#issuecomment-608066725
 
 
   We don't that's utility code.
   
   It'll need to be fixed up properly for any missing/new fields later after 
stability is hit, but only for new to the SDK features like Timer families.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 415007)
Time Spent: 11h 40m  (was: 11.5h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 11h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-04-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=414996&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-414996
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 02/Apr/20 19:39
Start Date: 02/Apr/20 19:39
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11297: [BEAM-9562] 
Update missed TimerSpec conversion in Go SDK
URL: https://github.com/apache/beam/pull/11297
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 414996)
Time Spent: 11.5h  (was: 11h 20m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 11.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >