[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421682&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421682 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 13/Apr/20 22:02 Start Date: 13/Apr/20 22:02 Worklog Time Spent: 10m Work Description: ibzib commented on pull request #11407: [BEAM-9562] Cherry-pick: Fix output timestamp to be inferred from scheduled time w… URL: https://github.com/apache/beam/pull/11407 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 421682) Time Spent: 24h 20m (was: 24h 10m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 24h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421626&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421626 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 13/Apr/20 20:43 Start Date: 13/Apr/20 20:43 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #11407: [BEAM-9562] Cherry-pick: Fix output timestamp to be inferred from scheduled time w… URL: https://github.com/apache/beam/pull/11407#issuecomment-613088556 R: @ibzib This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 421626) Time Spent: 24h 10m (was: 24h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 24h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421625&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421625 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 13/Apr/20 20:42 Start Date: 13/Apr/20 20:42 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11407: [BEAM-9562] Cherry-pick: Fix output timestamp to be inferred from scheduled time w… URL: https://github.com/apache/beam/pull/11407 …hen in the event time domain. (cherry picked from commit 009578e374523f5acd8d24543ef1ceec30542a95) **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostComm
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421624&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421624 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 13/Apr/20 20:36 Start Date: 13/Apr/20 20:36 Worklog Time Spent: 10m Work Description: mxm commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#issuecomment-613085672 I was actually working on something related to timers in #11362 and was surprised to see that the test failed when I opened the PR, since I had run tests locally. Then figured something must have changed on master in the meantime. Thanks for following up with this! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 421624) Time Spent: 23h 50m (was: 23h 40m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 23h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421617&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421617 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 13/Apr/20 20:30 Start Date: 13/Apr/20 20:30 Worklog Time Spent: 10m Work Description: mxm commented on issue #11402: [BEAM-9562] Fix output timestamp to be inferred from scheduled time when in the event time domain. URL: https://github.com/apache/beam/pull/11402#issuecomment-613082627 Thanks for correcting this! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 421617) Time Spent: 23h 40m (was: 23.5h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 23h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421597&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421597 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 13/Apr/20 19:57 Start Date: 13/Apr/20 19:57 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11402: [BEAM-9562] Fix output timestamp to be inferred from scheduled time when in the event time domain. URL: https://github.com/apache/beam/pull/11402 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 421597) Time Spent: 23.5h (was: 23h 20m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 23.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421537&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421537 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 13/Apr/20 18:44 Start Date: 13/Apr/20 18:44 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #11402: [BEAM-9562] Fix output timestamp to be inferred from scheduled time when in the event time domain. URL: https://github.com/apache/beam/pull/11402#issuecomment-613036333 Run Java Spark PortableValidatesRunner Batch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 421537) Time Spent: 23h (was: 22h 50m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 23h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421538&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421538 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 13/Apr/20 18:44 Start Date: 13/Apr/20 18:44 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #11402: [BEAM-9562] Fix output timestamp to be inferred from scheduled time when in the event time domain. URL: https://github.com/apache/beam/pull/11402#issuecomment-613036337 Run Java Flink PortableValidatesRunner Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 421538) Time Spent: 23h 10m (was: 23h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 23h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421536&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421536 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 13/Apr/20 18:44 Start Date: 13/Apr/20 18:44 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #11402: [BEAM-9562] Fix output timestamp to be inferred from scheduled time when in the event time domain. URL: https://github.com/apache/beam/pull/11402#issuecomment-613036251 Run Java Flink PortableValidatesRunner Batch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 421536) Time Spent: 22h 50m (was: 22h 40m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 22h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421539&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421539 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 13/Apr/20 18:44 Start Date: 13/Apr/20 18:44 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #11402: [BEAM-9562] Fix output timestamp to be inferred from scheduled time when in the event time domain. URL: https://github.com/apache/beam/pull/11402#issuecomment-613036374 Run Java Spark PortableValidatesRunner Batch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 421539) Time Spent: 23h 20m (was: 23h 10m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 23h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421535&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421535 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 13/Apr/20 18:43 Start Date: 13/Apr/20 18:43 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #11402: [BEAM-9562] Fix output timestamp to be inferred from scheduled time when in the event time domain. URL: https://github.com/apache/beam/pull/11402#issuecomment-613035835 R: @boyuanzz @robertwb CC: @ibzib We'll need this for 2.21.0 release. CC: @mxm This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 421535) Time Spent: 22h 40m (was: 22.5h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 22h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421531&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421531 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 13/Apr/20 18:41 Start Date: 13/Apr/20 18:41 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11402: [BEAM-9562] Fix output timestamp to be inferred from scheduled time when in the event time domain. URL: https://github.com/apache/beam/pull/11402 I copied the setting/validation logic from the [SimpleDoFnRunner Timers](https://github.com/apache/beam/blob/296f5a74f981c9023a54e1c2c89db7ee8e6b428a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L844) implementation. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421532&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421532 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 13/Apr/20 18:41 Start Date: 13/Apr/20 18:41 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#issuecomment-613034995 The problem is with the Timer implementation inside the FnApiDoFnRunner. The spec for Timer wasn't clear as to what the defaults were when withOutputTimestamp was added and hence some critical logic was deleted during the migration. See #11402 for the fix. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 421532) Time Spent: 22.5h (was: 22h 20m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 22.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421463&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421463 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 13/Apr/20 17:38 Start Date: 13/Apr/20 17:38 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#issuecomment-613006148 > This is a big change which also affects the runners. Would it have made sense to notify Runner authors, especially since post commit tests are broken? It took me a bit to figure out what caused the regression. Thanks, Max! Sorry for the inconvenience. It seems like currently both Spark and Flink fail on the same test: org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerAlignBounded. The failure pattern is also the same: the pipeline only produces the output from timer, not from the ProcessElement fn. I think there should be something wrong in the java runner shared library code. Have you worked on it? Or do you want me to follow up fixing this issue? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 421463) Time Spent: 22h 10m (was: 22h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 22h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421456&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421456 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 13/Apr/20 17:26 Start Date: 13/Apr/20 17:26 Worklog Time Spent: 10m Work Description: ibzib commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#issuecomment-613000822 @mxm Which post commits are you referring to? & Can you please mark the jira(s) with fix version 2.21.0 so we can fix the regression in the release? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 421456) Time Spent: 22h (was: 21h 50m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 22h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=421198&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-421198 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 13/Apr/20 09:24 Start Date: 13/Apr/20 09:24 Worklog Time Spent: 10m Work Description: mxm commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#issuecomment-612825390 This is a big change which also affects the runners. Would it have made sense to notify Runner authors, especially since post commit tests are broken? It took me a bit to figure out what caused the regression. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 421198) Time Spent: 21h 50m (was: 21h 40m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 21h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=420517&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-420517 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 10/Apr/20 21:26 Start Date: 10/Apr/20 21:26 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11373: [BEAM-9562] Update Element.timer and Element.Timer to Element.timers and Element.Timers URL: https://github.com/apache/beam/pull/11373 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 420517) Time Spent: 21h 40m (was: 21.5h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 21h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=420462&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-420462 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 10/Apr/20 20:07 Start Date: 10/Apr/20 20:07 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11373: [BEAM-9562] Update Element.timer and Element.Timer to Element.timers and Element.Timers URL: https://github.com/apache/beam/pull/11373#discussion_r406921308 ## File path: sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go ## @@ -6242,11 +6289,11 @@ var fileDescriptor_cf57597c3a9659a9 = []byte{ // Reference imports to suppress errors if they are not otherwise used. var _ context.Context -var _ grpc.ClientConn +var _ grpc.ClientConnInterface Review comment: It's fixed by following https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/model/PROTOBUF.md#generated-go-code-fails-to-build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 420462) Time Spent: 21.5h (was: 21h 20m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 21.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=420456&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-420456 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 10/Apr/20 19:59 Start Date: 10/Apr/20 19:59 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11373: [BEAM-9562] Update Element.timer and Element.Timer to Element.timers and Element.Timers URL: https://github.com/apache/beam/pull/11373#discussion_r406918619 ## File path: sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go ## @@ -6242,11 +6289,11 @@ var fileDescriptor_cf57597c3a9659a9 = []byte{ // Reference imports to suppress errors if they are not otherwise used. var _ context.Context -var _ grpc.ClientConn +var _ grpc.ClientConnInterface Review comment: @lostluck I'm not sure why this line and L6296 were changed. Do you have any idea? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 420456) Time Spent: 21h 20m (was: 21h 10m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 21h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=420445&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-420445 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 10/Apr/20 19:41 Start Date: 10/Apr/20 19:41 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11373: [BEAM-9562] Update Element.timer and Element.Timer to Element.timers and Element.Timers URL: https://github.com/apache/beam/pull/11373#discussion_r406912246 ## File path: model/fn-execution/src/main/proto/beam_fn_api.proto ## @@ -516,7 +516,7 @@ message Elements { repeated Data data = 1; // (Optional) A list of timer byte streams. - repeated Timer timer = 2; + repeated Timer timers = 2; Review comment: Both. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 420445) Time Spent: 21h 10m (was: 21h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 21h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419957&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419957 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 10/Apr/20 02:01 Start Date: 10/Apr/20 02:01 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11373: [BEAM-9562] Update Element.timer to Element.timers URL: https://github.com/apache/beam/pull/11373#discussion_r406567717 ## File path: model/fn-execution/src/main/proto/beam_fn_api.proto ## @@ -516,7 +516,7 @@ message Elements { repeated Data data = 1; // (Optional) A list of timer byte streams. - repeated Timer timer = 2; + repeated Timer timers = 2; Review comment: @robertwb did you want to rename this field or the proto message Timer -> Timers or both? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419957) Time Spent: 21h (was: 20h 50m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 21h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419956&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419956 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 10/Apr/20 02:00 Start Date: 10/Apr/20 02:00 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #11373: [BEAM-9562] Update Element.timer to Element.timers URL: https://github.com/apache/beam/pull/11373#issuecomment-611839103 please regenerate the go protos This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419956) Time Spent: 20h 50m (was: 20h 40m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 20h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419923&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419923 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 10/Apr/20 00:51 Start Date: 10/Apr/20 00:51 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11373: [BEAM-9562] Update Element.timer to Element.timers URL: https://github.com/apache/beam/pull/11373 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419907&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419907 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 10/Apr/20 00:19 Start Date: 10/Apr/20 00:19 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419907) Time Spent: 20.5h (was: 20h 20m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 20.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419796&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419796 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 21:36 Start Date: 09/Apr/20 21:36 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406491316 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ## @@ -536,7 +525,8 @@ def _run_stage(self, runner_execution_context, bundle_context_manager, data_input, -data_output, +data_output, {}, Review comment: yapf helps me put the {} here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419796) Time Spent: 20h 20m (was: 20h 10m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 20h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419793&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419793 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 21:34 Start Date: 09/Apr/20 21:34 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406490454 ## File path: sdks/python/apache_beam/transforms/core.py ## @@ -1272,6 +1272,8 @@ def expand(self, pcoll): key_coder = coder.key_coder() else: key_coder = coders.registry.get_coder(typehints.Any) + self.window_coder = pcoll.windowing.windowfn.get_window_coder() Review comment: No. Will removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419793) Time Spent: 19h 50m (was: 19h 40m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 19h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419795&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419795 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 21:34 Start Date: 09/Apr/20 21:34 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406490556 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ## @@ -987,8 +1019,10 @@ def __init__( def process_bundle(self, inputs, # type: Mapping[str, PartitionableBuffer] - expected_outputs # type: DataOutput -): + expected_outputs, # type: DataOutput + fired_timers, # type: Mapping[str, Mapping[str, PartitionableBuffer]] Review comment: I updated the `fired_timers` implementation but forgot to update the typing here. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419795) Time Spent: 20h 10m (was: 20h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 20h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419794&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419794 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 21:34 Start Date: 09/Apr/20 21:34 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406490504 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -837,25 +869,59 @@ def process_bundle(self, instruction_id): op.execution_context = execution_context op.start() - # Inject inputs from data plane. + # Each data_channel is mapped to a list of expected inputs which includes + # both data input and timer input. The data input is identied by + # transform_id. The data input is identified by + # (transform_id, timer_family_id). data_channels = collections.defaultdict( list ) # type: DefaultDict[data_plane.GrpcClientDataChannel, List[str]] + + # Inject data inputs from data plane. Review comment: Updated the comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419794) Time Spent: 20h (was: 19h 50m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 20h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419780&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419780 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 21:15 Start Date: 09/Apr/20 21:15 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406481820 ## File path: sdks/python/apache_beam/transforms/core.py ## @@ -1323,12 +1325,47 @@ def _pardo_fn_data(self): windowing = None return self.fn, self.args, self.kwargs, si_tags_and_types, windowing - def to_runner_api_parameter(self, context): + def _get_key_and_window_coder(self, named_inputs): +if named_inputs is None or not self._signature.is_stateful_dofn(): + return None, None +main_input = list(set(named_inputs.keys()) - set(self.side_inputs))[0] +input_pcoll = named_inputs[main_input] +kv_type_hint = input_pcoll.element_type +if kv_type_hint and kv_type_hint != typehints.Any: + coder = coders.registry.get_coder(kv_type_hint) + if not coder.is_kv_coder(): +raise ValueError( +'Input elements to the transform %s with stateful DoFn must be ' +'key-value pairs.' % self) + key_coder = coder.key_coder() +else: + key_coder = coders.registry.get_coder(typehints.Any) +window_coder = input_pcoll.windowing.windowfn.get_window_coder() +return key_coder, window_coder + + def to_runner_api(self, context, **extra_kwargs): Review comment: We can delete this override since we pass `extra_kwargs` from `PTransform` now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419780) Time Spent: 19h 40m (was: 19.5h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 19h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419762&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419762 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 21:01 Start Date: 09/Apr/20 21:01 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406468331 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ## @@ -536,7 +525,8 @@ def _run_stage(self, runner_execution_context, bundle_context_manager, data_input, -data_output, +data_output, {}, Review comment: Put {} on its own line. (Surprised yapf didn't complain, or maybe you haven't run it yet.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419762) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 19.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419758&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419758 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 21:01 Start Date: 09/Apr/20 21:01 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406444781 ## File path: sdks/python/apache_beam/transforms/core.py ## @@ -1323,12 +1325,47 @@ def _pardo_fn_data(self): windowing = None return self.fn, self.args, self.kwargs, si_tags_and_types, windowing - def to_runner_api_parameter(self, context): + def _get_key_and_window_coder(self, named_inputs): +if named_inputs is None or not self._signature.is_stateful_dofn(): + return None, None +main_input = list(set(named_inputs.keys()) - set(self.side_inputs))[0] +input_pcoll = named_inputs[main_input] +kv_type_hint = input_pcoll.element_type +if kv_type_hint and kv_type_hint != typehints.Any: + coder = coders.registry.get_coder(kv_type_hint) + if not coder.is_kv_coder(): +raise ValueError( +'Input elements to the transform %s with stateful DoFn must be ' +'key-value pairs.' % self) + key_coder = coder.key_coder() +else: + key_coder = coders.registry.get_coder(typehints.Any) +window_coder = input_pcoll.windowing.windowfn.get_window_coder() +return key_coder, window_coder + + def to_runner_api(self, context, **extra_kwargs): Review comment: This code looks like it's copied from the superclass, instead just do ``` def to_runner_api(self, context, named_inputs, **extra_kwargs): super(ParDo, self).to_runner_api, named_inputs=named_inputs, **extra_kwargs) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419758) Time Spent: 19h 20m (was: 19h 10m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 19h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419759&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419759 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 21:01 Start Date: 09/Apr/20 21:01 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406473230 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ## @@ -914,6 +926,17 @@ def process_bundle(self, split_manager = self._select_split_manager() if not split_manager: + # Send the fired timers if any. + for (transform_id, timer_family_id), timers in fired_timers.items(): +self._send_timers_to_worker( +process_bundle_id, transform_id, timer_family_id, timers) + + for transform_id, timer_family_id in ( + set(expected_output_timers.keys()) - set(fired_timers.keys())): +# Close the stream if there is no timers to be sent. Review comment: This is a subtle point. I might write something like "The worker waits for a logical timer stream to be closed for every possible timer, regardless of whether there are any timers to be sent." Maybe it'd be clearer to iterate over `expected_output_timers`, and send `fired_timers.get((transform_id, timer_family_id), [])`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419759) Time Spent: 19.5h (was: 19h 20m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 19.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419760&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419760 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 21:01 Start Date: 09/Apr/20 21:01 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406474463 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -355,20 +364,41 @@ def _build_process_bundle_descriptor(self): items()), environments=dict( self.execution_context.pipeline_components.environments.items()), -state_api_service_descriptor=self.state_api_service_descriptor()) +state_api_service_descriptor=self.state_api_service_descriptor(), +timer_api_service_descriptor=self.data_api_service_descriptor()) def get_input_coder_impl(self, transform_id): # type: (str) -> CoderImpl coder_id = beam_fn_api_pb2.RemoteGrpcPort.FromString( self.process_bundle_descriptor.transforms[transform_id].spec.payload ).coder_id assert coder_id +return self.get_coder_impl(coder_id) + + def _build_timer_coders_id_map(self): +timer_coder_ids = {} +for transform_id, transform_proto in (self._process_bundle_descriptor +.transforms.items()): + if transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn: +pardo_payload = proto_utils.parse_Bytes( +transform_proto.spec.payload, beam_runner_api_pb2.ParDoPayload) +for id, timer_family_spec in pardo_payload.timer_family_specs.items(): + timer_coder_ids[(transform_id, id)] = ( + timer_family_spec.timer_family_coder_id) +return timer_coder_ids + + def get_coder_impl(self, coder_id): if coder_id in self.execution_context.safe_coders: return self.execution_context.pipeline_context.coders[ self.execution_context.safe_coders[coder_id]].get_impl() else: return self.execution_context.pipeline_context.coders[coder_id].get_impl() + def get_timer_coder_impl(self, transform_id, timer_family_id): +assert (transform_id, timer_family_id) in self._timer_coder_ids Review comment: The key error if it's not present below will be sufficient. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419760) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 19.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419755&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419755 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 21:01 Start Date: 09/Apr/20 21:01 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406465514 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -837,25 +869,59 @@ def process_bundle(self, instruction_id): op.execution_context = execution_context op.start() - # Inject inputs from data plane. + # Each data_channel is mapped to a list of expected inputs which includes + # both data input and timer input. The data input is identied by + # transform_id. The data input is identified by + # (transform_id, timer_family_id). data_channels = collections.defaultdict( list ) # type: DefaultDict[data_plane.GrpcClientDataChannel, List[str]] + + # Inject data inputs from data plane. Review comment: This comment is a bit misleading, as the injection doesn't happen in this for loop. (Similarly with timers.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419755) Time Spent: 19h 10m (was: 19h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 19h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419757&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419757 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 21:01 Start Date: 09/Apr/20 21:01 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406467481 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ## @@ -987,8 +1019,10 @@ def __init__( def process_bundle(self, inputs, # type: Mapping[str, PartitionableBuffer] - expected_outputs # type: DataOutput -): + expected_outputs, # type: DataOutput + fired_timers, # type: Mapping[str, Mapping[str, PartitionableBuffer]] Review comment: For consistency, should this be a `Mapping[Tuple[str, str], PartitionableBuffer]`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419757) Time Spent: 19h 20m (was: 19h 10m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 19h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419756&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419756 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 21:01 Start Date: 09/Apr/20 21:01 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406466215 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py ## @@ -1321,80 +1321,18 @@ def remove_data_plane_ops(stages, pipeline_context): yield stage -def inject_timer_pcollections(stages, pipeline_context): +def setup_timer_mapping(stages, pipeline_context): # type: (Iterable[Stage], TransformContext) -> Iterator[Stage] - """Create PCollections for fired timers and to-be-set timers. - - At execution time, fired timers and timers-to-set are represented as - PCollections that are managed by the runner. This phase adds the - necissary collections, with their read and writes, to any stages using - timers. + """Set up a mapping of {transform_id: [timer_ids]} for each stage. """ for stage in stages: -for transform in list(stage.transforms): +for transform in stage.transforms: if transform.spec.urn in PAR_DO_URNS: payload = proto_utils.parse_Bytes( transform.spec.payload, beam_runner_api_pb2.ParDoPayload) -for tag, spec in payload.timer_family_specs.items(): - if len(transform.inputs) > 1: -raise NotImplementedError('Timers and side inputs.') - input_pcoll = pipeline_context.components.pcollections[next( - iter(transform.inputs.values()))] - # Create the appropriate coder for the timer PCollection. - key_coder_id = input_pcoll.coder_id - if (pipeline_context.components.coders[key_coder_id].spec.urn == - common_urns.coders.KV.urn): -key_coder_id = pipeline_context.components.coders[ -key_coder_id].component_coder_ids[0] - key_timer_coder_id = pipeline_context.add_or_get_coder_id( - beam_runner_api_pb2.Coder( - spec=beam_runner_api_pb2.FunctionSpec( - urn=common_urns.coders.KV.urn), - component_coder_ids=[ - key_coder_id, spec.timer_family_coder_id - ])) - # Inject the read and write pcollections. - timer_read_pcoll = unique_name( - pipeline_context.components.pcollections, - '%s_timers_to_read_%s' % (transform.unique_name, tag)) - timer_write_pcoll = unique_name( - pipeline_context.components.pcollections, - '%s_timers_to_write_%s' % (transform.unique_name, tag)) - pipeline_context.components.pcollections[timer_read_pcoll].CopyFrom( - beam_runner_api_pb2.PCollection( - unique_name=timer_read_pcoll, - coder_id=key_timer_coder_id, - windowing_strategy_id=input_pcoll.windowing_strategy_id, - is_bounded=input_pcoll.is_bounded)) - pipeline_context.components.pcollections[timer_write_pcoll].CopyFrom( - beam_runner_api_pb2.PCollection( - unique_name=timer_write_pcoll, - coder_id=key_timer_coder_id, - windowing_strategy_id=input_pcoll.windowing_strategy_id, - is_bounded=input_pcoll.is_bounded)) - stage.transforms.append( - beam_runner_api_pb2.PTransform( - unique_name=timer_read_pcoll + '/Read', - outputs={'out': timer_read_pcoll}, - spec=beam_runner_api_pb2.FunctionSpec( - urn=bundle_processor.DATA_INPUT_URN, - payload=create_buffer_id(timer_read_pcoll, - kind='timers' - stage.transforms.append( - beam_runner_api_pb2.PTransform( - unique_name=timer_write_pcoll + '/Write', - inputs={'in': timer_write_pcoll}, - spec=beam_runner_api_pb2.FunctionSpec( - urn=bundle_processor.DATA_OUTPUT_URN, - payload=create_buffer_id( - timer_write_pcoll, kind='timers' - assert tag not in transform.inputs - transform.inputs[tag] = timer_read_pcoll - assert tag not in transform.outputs - transform.outputs[tag] = timer_write_pcoll - stage.timer_pcollections.append( - (timer_read_pcoll + '/Read', timer_write_pcoll)) +for timer_family_id in payload.timer_family_specs.keys(): + stage.
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419761&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419761 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 21:01 Start Date: 09/Apr/20 21:01 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406471091 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ## @@ -896,7 +906,9 @@ def _generate_splits_for_testing(self, def process_bundle(self, inputs, # type: Mapping[str, PartitionableBuffer] - expected_outputs # type: DataOutput + expected_outputs, # type: DataOutput + fired_timers, # type: Mapping[str, Mapping[str, PartitionableBuffer]] Review comment: Mapping[Tuple[str, str], PartitionableBuffer]? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419761) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 19.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419754&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419754 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 21:01 Start Date: 09/Apr/20 21:01 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406444656 ## File path: sdks/python/apache_beam/transforms/core.py ## @@ -1272,6 +1272,8 @@ def expand(self, pcoll): key_coder = coder.key_coder() else: key_coder = coders.registry.get_coder(typehints.Any) + self.window_coder = pcoll.windowing.windowfn.get_window_coder() Review comment: Are these still used? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419754) Time Spent: 19h (was: 18h 50m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 19h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419695&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419695 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 19:56 Start Date: 09/Apr/20 19:56 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406442486 ## File path: sdks/python/apache_beam/transforms/core.py ## @@ -1323,12 +1325,47 @@ def _pardo_fn_data(self): windowing = None return self.fn, self.args, self.kwargs, si_tags_and_types, windowing - def to_runner_api_parameter(self, context): + def _get_key_and_window_coder(self, named_inputs): +if named_inputs is None or not self._signature.is_stateful_dofn(): + return None, None +main_input = list(set(named_inputs.keys()) - set(self.side_inputs))[0] +input_pcoll = named_inputs[main_input] +kv_type_hint = input_pcoll.element_type +if kv_type_hint and kv_type_hint != typehints.Any: + coder = coders.registry.get_coder(kv_type_hint) + if not coder.is_kv_coder(): +raise ValueError( +'Input elements to the transform %s with stateful DoFn must be ' +'key-value pairs.' % self) + key_coder = coder.key_coder() +else: + key_coder = coders.registry.get_coder(typehints.Any) +window_coder = input_pcoll.windowing.windowfn.get_window_coder() +return key_coder, window_coder + + def to_runner_api(self, context, **extra_kwargs): +# type: (PipelineContext, bool) -> beam_runner_api_pb2.FunctionSpec +has_parts = extra_kwargs.get('has_part', False) +urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs) Review comment: Nevermind, I see what's going on here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419695) Time Spent: 18h 50m (was: 18h 40m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 18h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419622&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419622 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 18:26 Start Date: 09/Apr/20 18:26 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406394861 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -1088,6 +1145,21 @@ def create_operation(self, transform_proto.spec.payload, parameter_type) return creator(self, transform_id, transform_proto, payload, consumers) + def extract_timers_info(self): Review comment: The keys of maps should still be the same though(tuple of (transform_id, timer_family_id)). That's why I make the value as a map{coder_impl, output_stream} This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419622) Time Spent: 18h 40m (was: 18.5h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 18h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419611&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419611 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 18:14 Start Date: 09/Apr/20 18:14 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406387842 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -1088,6 +1145,21 @@ def create_operation(self, transform_proto.spec.payload, parameter_type) return creator(self, transform_id, transform_proto, payload, consumers) + def extract_timers_info(self): Review comment: Ack, expanding the whole diff I see that this is happening in different methods now (in which case here two separate maps, as you had originally, might be preferable). But not a big deal. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419611) Time Spent: 18.5h (was: 18h 20m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 18.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419607&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419607 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 18:05 Start Date: 09/Apr/20 18:05 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406382954 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -1088,6 +1145,21 @@ def create_operation(self, transform_proto.spec.payload, parameter_type) return creator(self, transform_id, transform_proto, payload, consumers) + def extract_timers_info(self): Review comment: We can only populate output_stream when processing bundle since instruction_id is required. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419607) Time Spent: 18h 20m (was: 18h 10m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 18h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419547&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419547 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 16:58 Start Date: 09/Apr/20 16:58 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406344891 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -562,45 +562,45 @@ class OutputTimer(object): def __init__(self, key, window, # type: windowed_value.BoundedWindow - receiver # type: operations.ConsumerSet + paneinfo, + timer_family_id, + timer_coder_impl, + output_stream ): self._key = key self._window = window -self._receiver = receiver +self._paneinfo = paneinfo +self._timer_family_id = timer_family_id +self._output_stream = output_stream +self._timer_coder_impl = timer_coder_impl def set(self, ts): ts = timestamp.Timestamp.of(ts) -# TODO(BEAM-9562): Plumb through actual timer fields. -self._receiver.receive( -windowed_value.WindowedValue(( -self._key, -userstate.Timer( -user_key='', -dynamic_timer_tag='', -windows=(self._window, ), -clear_bit=False, -fire_timestamp=ts, -hold_timestamp=ts, -paneinfo=windowed_value.PANE_INFO_UNKNOWN)), - ts, (self._window, ))) +timer = userstate.Timer( +user_key=self._key, +dynamic_timer_tag='', +windows=(self._window, ), +clear_bit=False, +fire_timestamp=ts, +hold_timestamp=ts, +paneinfo=self._paneinfo) +self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True) +self._output_stream.maybe_flush() def clear(self): # type: () -> None dummy_millis = int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) + 1 clear_ts = timestamp.Timestamp(micros=dummy_millis * 1000) -# TODO(BEAM-9562): Plumb through actual paneinfo. -self._receiver.receive( -windowed_value.WindowedValue(( -self._key, -userstate.Timer( -user_key='', -dynamic_timer_tag='', -windows=(self._window, ), -clear_bit=False, -fire_timestamp=timestamp.Timestamp.of(clear_ts), -hold_timestamp=timestamp.Timestamp.of(0), -paneinfo=windowed_value.PANE_INFO_UNKNOWN)), - 0, (self._window, ))) +timer = userstate.Timer( +user_key=self._key, +dynamic_timer_tag='', +windows=(self._window, ), +clear_bit=False, +fire_timestamp=clear_ts, Review comment: Correct, when `clear_bit` is `True`, the coder ignores these fields. I think we should have a better `Timer` with API `of` and `clear` like in Java as a follow up. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419547) Time Spent: 18h 10m (was: 18h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 18h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419496&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419496 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 15:59 Start Date: 09/Apr/20 15:59 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#issuecomment-611607510 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419496) Time Spent: 18h (was: 17h 50m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 18h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419439&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419439 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 14:55 Start Date: 09/Apr/20 14:55 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#issuecomment-611572961 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419439) Time Spent: 17h 50m (was: 17h 40m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 17h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419191&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419191 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 07:21 Start Date: 09/Apr/20 07:21 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405989307 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -562,45 +562,45 @@ class OutputTimer(object): def __init__(self, key, window, # type: windowed_value.BoundedWindow - receiver # type: operations.ConsumerSet + paneinfo, + timer_family_id, + timer_coder_impl, + output_stream ): self._key = key self._window = window -self._receiver = receiver +self._paneinfo = paneinfo +self._timer_family_id = timer_family_id +self._output_stream = output_stream +self._timer_coder_impl = timer_coder_impl def set(self, ts): ts = timestamp.Timestamp.of(ts) -# TODO(BEAM-9562): Plumb through actual timer fields. -self._receiver.receive( -windowed_value.WindowedValue(( -self._key, -userstate.Timer( -user_key='', -dynamic_timer_tag='', -windows=(self._window, ), -clear_bit=False, -fire_timestamp=ts, -hold_timestamp=ts, -paneinfo=windowed_value.PANE_INFO_UNKNOWN)), - ts, (self._window, ))) +timer = userstate.Timer( +user_key=self._key, +dynamic_timer_tag='', +windows=(self._window, ), +clear_bit=False, +fire_timestamp=ts, +hold_timestamp=ts, +paneinfo=self._paneinfo) +self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True) +self._output_stream.maybe_flush() def clear(self): # type: () -> None dummy_millis = int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) + 1 clear_ts = timestamp.Timestamp(micros=dummy_millis * 1000) -# TODO(BEAM-9562): Plumb through actual paneinfo. -self._receiver.receive( -windowed_value.WindowedValue(( -self._key, -userstate.Timer( -user_key='', -dynamic_timer_tag='', -windows=(self._window, ), -clear_bit=False, -fire_timestamp=timestamp.Timestamp.of(clear_ts), -hold_timestamp=timestamp.Timestamp.of(0), -paneinfo=windowed_value.PANE_INFO_UNKNOWN)), - 0, (self._window, ))) +timer = userstate.Timer( +user_key=self._key, +dynamic_timer_tag='', +windows=(self._window, ), +clear_bit=False, +fire_timestamp=clear_ts, Review comment: They're meaningless when we're clearing a timer (e.g. it won't fire, hold back the watermark, or have a pane info). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419191) Time Spent: 17h (was: 16h 50m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 17h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419201&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419201 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 07:21 Start Date: 09/Apr/20 07:21 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406003424 ## File path: sdks/python/apache_beam/runners/worker/data_plane.py ## @@ -408,27 +470,67 @@ def close_callback(data): return ClosableOutputStream.create( close_callback, add_to_send_queue, self._data_buffer_time_limit_ms) + def output_timer_stream(self, instruction_id, transform_id, timer_family_id): +def add_to_send_queue(timer): + if timer: +self._to_send.put( +beam_fn_api_pb2.Elements.Timer( +instruction_id=instruction_id, +transform_id=transform_id, +timer_family_id=timer_family_id, +timers=timer, +is_last=False)) + +def close_callback(timer): + add_to_send_queue(timer) + self._to_send.put( + beam_fn_api_pb2.Elements.Timer( + instruction_id=instruction_id, + transform_id=transform_id, + timer_family_id=timer_family_id, + timers=b'', + is_last=True)) + +return ClosableOutputStream.create( +close_callback, add_to_send_queue, self._data_buffer_time_limit_ms) + def _write_outputs(self): # type: () -> Iterator[beam_fn_api_pb2.Elements] -done = False -while not done: - data = [self._to_send.get()] - try: -# Coalesce up to 100 other items. -for _ in range(100): - data.append(self._to_send.get_nowait()) - except queue.Empty: -pass - if data[-1] is self._WRITES_FINISHED: -done = True -data.pop() - if data: -yield beam_fn_api_pb2.Elements(data=data) +stream_done = False +while not stream_done: + streams = None + if not stream_done: +streams = [self._to_send.get()] +try: + # Coalesce up to 100 other items. + for _ in range(100): +streams.append(self._to_send.get_nowait()) +except queue.Empty: + pass +if streams and streams[-1] is self._WRITES_FINISHED: + stream_done = True + streams.pop() + if streams: +elements = beam_fn_api_pb2.Elements() +data_stream = [] +timer_stream = [] +for stream in streams: + if isinstance(stream, beam_fn_api_pb2.Elements.Timer): +timer_stream.append(stream) + if isinstance(stream, beam_fn_api_pb2.Elements.Data): +data_stream.append(stream) +if data_stream: Review comment: No need to have these conditionals, you can just write `yield beam_fn_api_pb2.Elements(data=data_stream, timer=timer_stream)` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419201) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 17.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419203&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419203 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 07:21 Start Date: 09/Apr/20 07:21 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405999089 ## File path: sdks/python/apache_beam/runners/worker/data_plane.py ## @@ -354,15 +410,15 @@ def input_elements(self, Args: instruction_id(str): instruction_id for which data is read - expected_transforms(collection): expected transforms + expected_inputs(collection): expected inputs, include both data and timer. """ received = self._receiving_queue(instruction_id) -done_transforms = set() # type: Set[str] +done_inputs = set() # type: Set[str] Review comment: update type hint This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419203) Time Spent: 17h 40m (was: 17.5h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 17h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419198&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419198 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 07:21 Start Date: 09/Apr/20 07:21 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405998009 ## File path: sdks/python/apache_beam/runners/worker/data_plane.py ## @@ -274,20 +301,47 @@ def inverse(self): return self._inverse def input_elements(self, - instruction_id, # type: str - unused_expected_transforms=None, # type: Optional[Collection[str]] - abort_callback=None # type: Optional[Callable[[], bool]] -): -# type: (...) -> Iterator[beam_fn_api_pb2.Elements.Data] Review comment: It'd be good to not lose the typing information. You can make an alias at the top of the file `DataOrTimers = Union[beam_fn_api_pb2.Elements.Data, beam_fn_api_pb2.Elements.Timer]` to cut down on verbosity, here and elsewhere. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419198) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 17h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419189&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419189 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 07:21 Start Date: 09/Apr/20 07:21 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405985691 ## File path: sdks/python/apache_beam/transforms/core.py ## @@ -1323,12 +1325,47 @@ def _pardo_fn_data(self): windowing = None return self.fn, self.args, self.kwargs, si_tags_and_types, windowing - def to_runner_api_parameter(self, context): + def _get_key_and_window_coder(self, named_inputs): +if named_inputs is None or not self._signature.is_stateful_dofn(): + return None, None +main_input = list(set(named_inputs.keys()) - set(self.side_inputs))[0] +input_pcoll = named_inputs[main_input] +kv_type_hint = input_pcoll.element_type +if kv_type_hint and kv_type_hint != typehints.Any: + coder = coders.registry.get_coder(kv_type_hint) + if not coder.is_kv_coder(): +raise ValueError( +'Input elements to the transform %s with stateful DoFn must be ' +'key-value pairs.' % self) + key_coder = coder.key_coder() +else: + key_coder = coders.registry.get_coder(typehints.Any) +window_coder = input_pcoll.windowing.windowfn.get_window_coder() +return key_coder, window_coder + + def to_runner_api(self, context, **extra_kwargs): +# type: (PipelineContext, bool) -> beam_runner_api_pb2.FunctionSpec +has_parts = extra_kwargs.get('has_part', False) Review comment: You can leave this in the parameter list. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419189) Time Spent: 16h 50m (was: 16h 40m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 16h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419197&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419197 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 07:21 Start Date: 09/Apr/20 07:21 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405999346 ## File path: sdks/python/apache_beam/runners/worker/data_plane.py ## @@ -372,12 +428,18 @@ def input_elements(self, t, v, tb = self._exc_info raise_(t, v, tb) else: - # TODO(BEAM-9558): Cleanup once dataflow is updated. - if not data.data or data.is_last: -done_transforms.add(data.transform_id) - else: -assert data.transform_id not in done_transforms -yield data + if isinstance(element, beam_fn_api_pb2.Elements.Timer): +if element.is_last: + done_inputs.add((element.transform_id, element.timer_family_id)) +else: + yield element + if isinstance(element, beam_fn_api_pb2.Elements.Data): Review comment: elif This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419197) Time Spent: 17h 20m (was: 17h 10m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 17h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419200&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419200 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 07:21 Start Date: 09/Apr/20 07:21 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406003474 ## File path: sdks/python/apache_beam/runners/worker/data_plane.py ## @@ -408,27 +470,67 @@ def close_callback(data): return ClosableOutputStream.create( close_callback, add_to_send_queue, self._data_buffer_time_limit_ms) + def output_timer_stream(self, instruction_id, transform_id, timer_family_id): +def add_to_send_queue(timer): + if timer: +self._to_send.put( +beam_fn_api_pb2.Elements.Timer( +instruction_id=instruction_id, +transform_id=transform_id, +timer_family_id=timer_family_id, +timers=timer, +is_last=False)) + +def close_callback(timer): + add_to_send_queue(timer) + self._to_send.put( + beam_fn_api_pb2.Elements.Timer( + instruction_id=instruction_id, + transform_id=transform_id, + timer_family_id=timer_family_id, + timers=b'', + is_last=True)) + +return ClosableOutputStream.create( +close_callback, add_to_send_queue, self._data_buffer_time_limit_ms) + def _write_outputs(self): # type: () -> Iterator[beam_fn_api_pb2.Elements] -done = False -while not done: - data = [self._to_send.get()] - try: -# Coalesce up to 100 other items. -for _ in range(100): - data.append(self._to_send.get_nowait()) - except queue.Empty: -pass - if data[-1] is self._WRITES_FINISHED: -done = True -data.pop() - if data: -yield beam_fn_api_pb2.Elements(data=data) +stream_done = False +while not stream_done: + streams = None + if not stream_done: +streams = [self._to_send.get()] +try: + # Coalesce up to 100 other items. + for _ in range(100): +streams.append(self._to_send.get_nowait()) +except queue.Empty: + pass +if streams and streams[-1] is self._WRITES_FINISHED: + stream_done = True + streams.pop() + if streams: +elements = beam_fn_api_pb2.Elements() +data_stream = [] +timer_stream = [] +for stream in streams: + if isinstance(stream, beam_fn_api_pb2.Elements.Timer): +timer_stream.append(stream) + if isinstance(stream, beam_fn_api_pb2.Elements.Data): Review comment: else This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419200) Time Spent: 17.5h (was: 17h 20m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 17.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419196&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419196 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 07:21 Start Date: 09/Apr/20 07:21 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405998237 ## File path: sdks/python/apache_beam/runners/worker/data_plane.py ## @@ -274,20 +301,47 @@ def inverse(self): return self._inverse def input_elements(self, - instruction_id, # type: str - unused_expected_transforms=None, # type: Optional[Collection[str]] - abort_callback=None # type: Optional[Callable[[], bool]] -): -# type: (...) -> Iterator[beam_fn_api_pb2.Elements.Data] + instruction_id, # type: str + unused_expected_inputes=None, # type: Collection[str] Review comment: inputes -> inputs This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419196) Time Spent: 17h 20m (was: 17h 10m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 17h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419194&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419194 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 07:21 Start Date: 09/Apr/20 07:21 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405994886 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -745,6 +746,24 @@ def __init__(self, self.process_bundle_descriptor = process_bundle_descriptor self.state_handler = state_handler self.data_channel_factory = data_channel_factory + +# There is no guarantee that the runner only set +# timer_api_service_descriptor when having timers. So this field cannot be +# used as an indicator of timers. +if self.process_bundle_descriptor.timer_api_service_descriptor: + self.timer_data_channel = ( + data_channel_factory.create_data_channel_from_url( + self.process_bundle_descriptor.timer_api_service_descriptor.url)) +else: + self.timer_data_channel = None + +# A mapping of +# {(transform_id, timer_family_id): +# {"timer_coder_impl": coder, "output_stream": stream}} Review comment: Optonal: [Named] tuples are usually easier to work with than dicts. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419194) Time Spent: 17h 10m (was: 17h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 17h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419193&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419193 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 07:21 Start Date: 09/Apr/20 07:21 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405990412 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -745,6 +746,24 @@ def __init__(self, self.process_bundle_descriptor = process_bundle_descriptor self.state_handler = state_handler self.data_channel_factory = data_channel_factory + +# There is no guarantee that the runner only set +# timer_api_service_descriptor when having timers. So this field cannot be +# used as an indicator of timers. +if self.process_bundle_descriptor.timer_api_service_descriptor: + self.timer_data_channel = ( + data_channel_factory.create_data_channel_from_url( + self.process_bundle_descriptor.timer_api_service_descriptor.url)) +else: + self.timer_data_channel = None + +# A mapping of +# {(transform_id, timer_family_id): +# {"timer_coder_impl": coder, "output_stream": stream}} +# The mapping keeps empty when there is no timer_family_specs in the Review comment: Nit: The mapping stays (or is) empty... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419193) Time Spent: 17h 10m (was: 17h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 17h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419195&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419195 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 07:21 Start Date: 09/Apr/20 07:21 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405995554 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -1088,6 +1145,21 @@ def create_operation(self, transform_proto.spec.payload, parameter_type) return creator(self, transform_id, transform_proto, payload, consumers) + def extract_timers_info(self): Review comment: I would populate output_stream here as well rather than above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419195) Time Spent: 17h 20m (was: 17h 10m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 17h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419188&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419188 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 07:21 Start Date: 09/Apr/20 07:21 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405986529 ## File path: sdks/python/apache_beam/transforms/core.py ## @@ -1323,12 +1325,47 @@ def _pardo_fn_data(self): windowing = None return self.fn, self.args, self.kwargs, si_tags_and_types, windowing - def to_runner_api_parameter(self, context): + def _get_key_and_window_coder(self, named_inputs): +if named_inputs is None or not self._signature.is_stateful_dofn(): + return None, None +main_input = list(set(named_inputs.keys()) - set(self.side_inputs))[0] +input_pcoll = named_inputs[main_input] +kv_type_hint = input_pcoll.element_type +if kv_type_hint and kv_type_hint != typehints.Any: + coder = coders.registry.get_coder(kv_type_hint) + if not coder.is_kv_coder(): +raise ValueError( +'Input elements to the transform %s with stateful DoFn must be ' +'key-value pairs.' % self) + key_coder = coder.key_coder() +else: + key_coder = coders.registry.get_coder(typehints.Any) +window_coder = input_pcoll.windowing.windowfn.get_window_coder() +return key_coder, window_coder + + def to_runner_api(self, context, **extra_kwargs): +# type: (PipelineContext, bool) -> beam_runner_api_pb2.FunctionSpec +has_parts = extra_kwargs.get('has_part', False) +urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs) +if urn == python_urns.GENERIC_COMPOSITE_TRANSFORM and not has_parts: + # TODO(BEAM-3812): Remove this fallback. + urn, typed_param = self.to_runner_api_pickled(context) +return beam_runner_api_pb2.FunctionSpec( +urn=urn, +payload=typed_param.SerializeToString() if isinstance( +typed_param, message.Message) else typed_param.encode('utf-8') +if isinstance(typed_param, str) else typed_param) + + def to_runner_api_parameter(self, context, **extra_kwargs): # type: (PipelineContext) -> typing.Tuple[str, message.Message] assert isinstance(self, ParDo), \ "expected instance of ParDo, but got %s" % self.__class__ +key_coder, window_coder = self._get_key_and_window_coder( Review comment: Maybe put this in the if block below closer to where they're used? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419188) Time Spent: 16h 40m (was: 16.5h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 16h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419202&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419202 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 07:21 Start Date: 09/Apr/20 07:21 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406004352 ## File path: sdks/python/apache_beam/runners/worker/operations.pxd ## @@ -92,7 +92,7 @@ cdef class DoOperation(Operation): cdef DoFnRunner dofn_runner cdef object tagged_receivers cdef object side_input_maps - cdef object user_state_context + cpdef public object user_state_context Review comment: Rather than making this public, I would add an `add_timer_info` method to this operation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419202) Time Spent: 17h 40m (was: 17.5h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 17h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419190&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419190 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 07:21 Start Date: 09/Apr/20 07:21 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405987932 ## File path: sdks/python/apache_beam/transforms/core.py ## @@ -1323,12 +1325,47 @@ def _pardo_fn_data(self): windowing = None return self.fn, self.args, self.kwargs, si_tags_and_types, windowing - def to_runner_api_parameter(self, context): + def _get_key_and_window_coder(self, named_inputs): +if named_inputs is None or not self._signature.is_stateful_dofn(): + return None, None +main_input = list(set(named_inputs.keys()) - set(self.side_inputs))[0] +input_pcoll = named_inputs[main_input] +kv_type_hint = input_pcoll.element_type +if kv_type_hint and kv_type_hint != typehints.Any: + coder = coders.registry.get_coder(kv_type_hint) + if not coder.is_kv_coder(): +raise ValueError( +'Input elements to the transform %s with stateful DoFn must be ' +'key-value pairs.' % self) + key_coder = coder.key_coder() +else: + key_coder = coders.registry.get_coder(typehints.Any) +window_coder = input_pcoll.windowing.windowfn.get_window_coder() +return key_coder, window_coder + + def to_runner_api(self, context, **extra_kwargs): +# type: (PipelineContext, bool) -> beam_runner_api_pb2.FunctionSpec +has_parts = extra_kwargs.get('has_part', False) +urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs) Review comment: https://engdoc.corp.google.com/eng/doc/devguide/py/totw/026.md?cl=head This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419190) Time Spent: 16h 50m (was: 16h 40m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 16h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419199&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419199 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 07:21 Start Date: 09/Apr/20 07:21 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r406002550 ## File path: sdks/python/apache_beam/runners/worker/data_plane.py ## @@ -408,27 +470,67 @@ def close_callback(data): return ClosableOutputStream.create( close_callback, add_to_send_queue, self._data_buffer_time_limit_ms) + def output_timer_stream(self, instruction_id, transform_id, timer_family_id): +def add_to_send_queue(timer): + if timer: +self._to_send.put( +beam_fn_api_pb2.Elements.Timer( +instruction_id=instruction_id, +transform_id=transform_id, +timer_family_id=timer_family_id, +timers=timer, +is_last=False)) + +def close_callback(timer): + add_to_send_queue(timer) + self._to_send.put( + beam_fn_api_pb2.Elements.Timer( + instruction_id=instruction_id, + transform_id=transform_id, + timer_family_id=timer_family_id, + timers=b'', + is_last=True)) + +return ClosableOutputStream.create( +close_callback, add_to_send_queue, self._data_buffer_time_limit_ms) + def _write_outputs(self): # type: () -> Iterator[beam_fn_api_pb2.Elements] -done = False -while not done: - data = [self._to_send.get()] - try: -# Coalesce up to 100 other items. -for _ in range(100): - data.append(self._to_send.get_nowait()) - except queue.Empty: -pass - if data[-1] is self._WRITES_FINISHED: -done = True -data.pop() - if data: -yield beam_fn_api_pb2.Elements(data=data) +stream_done = False +while not stream_done: + streams = None + if not stream_done: Review comment: This will always be true (given the loop condition). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419199) Time Spent: 17.5h (was: 17h 20m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 17.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419192&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419192 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 07:21 Start Date: 09/Apr/20 07:21 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405990026 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -629,28 +628,28 @@ def __init__(self, self._transform_id = transform_id self._key_coder = key_coder self._window_coder = window_coder -self._timer_family_specs = timer_family_specs -self._timer_receivers = None # type: Optional[Dict[str, operations.ConsumerSet]] +# A mapping of {timer_family_id: OutputStream} +self._timer_output_streams = {} +self._timer_coders_impl = {} self._all_states = { } # type: Dict[tuple, userstate.AccumulatingRuntimeState] - def update_timer_receivers(self, receivers): -# type: (operations._TaggedReceivers) -> None - -"""TODO""" -self._timer_receivers = {} -for tag in self._timer_family_specs: - self._timer_receivers[tag] = receivers.pop(tag) + def add_timer_info(self, timer_family_id, output_stream, coder_impl): +self._timer_output_streams[timer_family_id] = output_stream +self._timer_coders_impl[timer_family_id] = coder_impl def get_timer( self, timer_spec, key, - window # type: windowed_value.BoundedWindow - ): + window, # type: windowed_value.BoundedWindow + pane): # type: (...) -> OutputTimer -assert self._timer_receivers is not None -return OutputTimer(key, window, self._timer_receivers[timer_spec.name]) +output_stream = self._timer_output_streams[timer_spec.name] Review comment: If this were a single map rather that two parallel maps, you could write something like `output_tream, timer_coder_impl = self._timer_info(timer_spec.name]` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419192) Time Spent: 17h 10m (was: 17h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 17h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419166&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419166 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 06:55 Start Date: 09/Apr/20 06:55 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#issuecomment-611363247 The [test_pardo_timers_clear](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py#L351-L389) fails with streaming Flink. The python sdk sends all timers(hold_timestamp=-INF with python default behavior) but only gets the timer with timestamp=20 back. Given the test only fails when streaming, it seems like something not correct with watermark(?). @lukecwik This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419166) Time Spent: 16.5h (was: 16h 20m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 16.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419062&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419062 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 01:35 Start Date: 09/Apr/20 01:35 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405907504 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java ## @@ -238,9 +238,14 @@ protected BatchDataflowWorker( sdkFusedStage = pipeline == null ? RegisterNodeFunction.withoutPipeline( -idGenerator, sdkHarnessRegistry.beamFnStateApiServiceDescriptor()) +idGenerator, +sdkHarnessRegistry.beamFnStateApiServiceDescriptor(), +sdkHarnessRegistry.beamFnDataApiServiceDescriptor()) Review comment: That is correct. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419062) Time Spent: 16h 20m (was: 16h 10m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 16h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419043&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419043 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 00:40 Start Date: 09/Apr/20 00:40 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405892978 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -562,45 +562,45 @@ class OutputTimer(object): def __init__(self, key, window, # type: windowed_value.BoundedWindow - receiver # type: operations.ConsumerSet + paneinfo, + timer_family_id, + timer_coder_impl, + output_stream ): self._key = key self._window = window -self._receiver = receiver +self._paneinfo = paneinfo +self._timer_family_id = timer_family_id +self._output_stream = output_stream +self._timer_coder_impl = timer_coder_impl def set(self, ts): ts = timestamp.Timestamp.of(ts) -# TODO(BEAM-9562): Plumb through actual timer fields. -self._receiver.receive( -windowed_value.WindowedValue(( -self._key, -userstate.Timer( -user_key='', -dynamic_timer_tag='', -windows=(self._window, ), -clear_bit=False, -fire_timestamp=ts, -hold_timestamp=ts, -paneinfo=windowed_value.PANE_INFO_UNKNOWN)), - ts, (self._window, ))) +timer = userstate.Timer( +user_key=self._key, +dynamic_timer_tag='', +windows=(self._window, ), +clear_bit=False, +fire_timestamp=ts, +hold_timestamp=ts, +paneinfo=self._paneinfo) +self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True) +self._output_stream.maybe_flush() def clear(self): # type: () -> None dummy_millis = int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) + 1 clear_ts = timestamp.Timestamp(micros=dummy_millis * 1000) -# TODO(BEAM-9562): Plumb through actual paneinfo. -self._receiver.receive( -windowed_value.WindowedValue(( -self._key, -userstate.Timer( -user_key='', -dynamic_timer_tag='', -windows=(self._window, ), -clear_bit=False, -fire_timestamp=timestamp.Timestamp.of(clear_ts), -hold_timestamp=timestamp.Timestamp.of(0), -paneinfo=windowed_value.PANE_INFO_UNKNOWN)), - 0, (self._window, ))) +timer = userstate.Timer( +user_key=self._key, +dynamic_timer_tag='', +windows=(self._window, ), +clear_bit=False, +fire_timestamp=clear_ts, Review comment: > (Should the coder be ignoring them as well?) No, the timer coder is encoding all of these info now. > Don't bother setting these timestamps, or paneinfo. Could you please explain more about this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419043) Time Spent: 16h 10m (was: 16h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 16h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419042&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419042 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 00:39 Start Date: 09/Apr/20 00:39 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405892978 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -562,45 +562,45 @@ class OutputTimer(object): def __init__(self, key, window, # type: windowed_value.BoundedWindow - receiver # type: operations.ConsumerSet + paneinfo, + timer_family_id, + timer_coder_impl, + output_stream ): self._key = key self._window = window -self._receiver = receiver +self._paneinfo = paneinfo +self._timer_family_id = timer_family_id +self._output_stream = output_stream +self._timer_coder_impl = timer_coder_impl def set(self, ts): ts = timestamp.Timestamp.of(ts) -# TODO(BEAM-9562): Plumb through actual timer fields. -self._receiver.receive( -windowed_value.WindowedValue(( -self._key, -userstate.Timer( -user_key='', -dynamic_timer_tag='', -windows=(self._window, ), -clear_bit=False, -fire_timestamp=ts, -hold_timestamp=ts, -paneinfo=windowed_value.PANE_INFO_UNKNOWN)), - ts, (self._window, ))) +timer = userstate.Timer( +user_key=self._key, +dynamic_timer_tag='', +windows=(self._window, ), +clear_bit=False, +fire_timestamp=ts, +hold_timestamp=ts, +paneinfo=self._paneinfo) +self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True) +self._output_stream.maybe_flush() def clear(self): # type: () -> None dummy_millis = int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) + 1 clear_ts = timestamp.Timestamp(micros=dummy_millis * 1000) -# TODO(BEAM-9562): Plumb through actual paneinfo. -self._receiver.receive( -windowed_value.WindowedValue(( -self._key, -userstate.Timer( -user_key='', -dynamic_timer_tag='', -windows=(self._window, ), -clear_bit=False, -fire_timestamp=timestamp.Timestamp.of(clear_ts), -hold_timestamp=timestamp.Timestamp.of(0), -paneinfo=windowed_value.PANE_INFO_UNKNOWN)), - 0, (self._window, ))) +timer = userstate.Timer( +user_key=self._key, +dynamic_timer_tag='', +windows=(self._window, ), +clear_bit=False, +fire_timestamp=clear_ts, Review comment: > (Should the coder be ignoring them as well?) No, the timer coder is encoding all of these info now. > Don't bother setting these timestamps, or paneinfo. Could you please explain more about this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419042) Time Spent: 16h (was: 15h 50m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 16h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419040&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419040 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 00:38 Start Date: 09/Apr/20 00:38 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405892635 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java ## @@ -134,14 +136,18 @@ public static RegisterNodeFunction forPipeline( * harnesses, then this method should be removed. */ public static RegisterNodeFunction withoutPipeline( - IdGenerator idGenerator, Endpoints.ApiServiceDescriptor stateApiServiceDescriptor) { -return new RegisterNodeFunction(null, idGenerator, stateApiServiceDescriptor); + IdGenerator idGenerator, + Endpoints.ApiServiceDescriptor stateApiServiceDescriptor, + Endpoints.ApiServiceDescriptor timerApiServiceDescriptor) { +return new RegisterNodeFunction( +null, idGenerator, stateApiServiceDescriptor, timerApiServiceDescriptor); } private RegisterNodeFunction( @Nullable RunnerApi.Pipeline pipeline, IdGenerator idGenerator, - Endpoints.ApiServiceDescriptor stateApiServiceDescriptor) { + Endpoints.ApiServiceDescriptor stateApiServiceDescriptor, + Endpoints.ApiServiceDescriptor timerApiServiceDescriptor) { Review comment: timerApiServiceDescriptor isn't used? Should it be stored and written to the ProcessBundleDescrioptor? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419040) Time Spent: 15h 50m (was: 15h 40m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 15h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=419034&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-419034 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 09/Apr/20 00:29 Start Date: 09/Apr/20 00:29 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405890215 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java ## @@ -238,9 +238,14 @@ protected BatchDataflowWorker( sdkFusedStage = pipeline == null ? RegisterNodeFunction.withoutPipeline( -idGenerator, sdkHarnessRegistry.beamFnStateApiServiceDescriptor()) +idGenerator, +sdkHarnessRegistry.beamFnStateApiServiceDescriptor(), +sdkHarnessRegistry.beamFnDataApiServiceDescriptor()) Review comment: I see. So we only have a separate timer_api_service_descriptor in the protos so that a runner has the option to make it separate, but it doesn't need to be separate? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 419034) Time Spent: 15h 40m (was: 15.5h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 15h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418986&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418986 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 08/Apr/20 23:32 Start Date: 08/Apr/20 23:32 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405873264 ## File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java ## @@ -914,6 +923,7 @@ public Object createRunnerForPTransform( PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, BeamFnStateClient beamFnStateClient, + BeamFnTimerClient beamFnTimerClient, String pTransformId, PTransform pTransform, Supplier processBundleInstructionId, Review comment: for completeness yes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 418986) Time Spent: 15.5h (was: 15h 20m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 15.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418968&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418968 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 08/Apr/20 23:25 Start Date: 08/Apr/20 23:25 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405870882 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java ## @@ -238,9 +238,14 @@ protected BatchDataflowWorker( sdkFusedStage = pipeline == null ? RegisterNodeFunction.withoutPipeline( -idGenerator, sdkHarnessRegistry.beamFnStateApiServiceDescriptor()) +idGenerator, +sdkHarnessRegistry.beamFnStateApiServiceDescriptor(), +sdkHarnessRegistry.beamFnDataApiServiceDescriptor()) Review comment: They both use the Data API so no. All were saying here is that we will re-use the same gRPC channel for both timers and data. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 418968) Time Spent: 15h 20m (was: 15h 10m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 15h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418967&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418967 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 08/Apr/20 23:24 Start Date: 08/Apr/20 23:24 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405870539 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ## @@ -257,6 +230,19 @@ public static ParDoPayload translateParDo( restrictionCoderId = ""; } +Coder windowCoder = +(Coder) mainInput.getWindowingStrategy().getWindowFn().windowCoder(); +Coder keyCoder; +if (signature.usesState() || signature.usesTimers()) { + checkArgument( + mainInput.getCoder() instanceof KvCoder, + "DoFn's that use state or timers must have an input PCollection with a KvCoder but received %s", + mainInput.getCoder()); Review comment: It was being covered by validation in DoFnSignatures but it is being repeated here for defense in depth reasons. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 418967) Time Spent: 15h 10m (was: 15h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 15h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418956&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418956 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 08/Apr/20 22:53 Start Date: 08/Apr/20 22:53 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405823916 ## File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java ## @@ -914,6 +923,7 @@ public Object createRunnerForPTransform( PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, BeamFnStateClient beamFnStateClient, + BeamFnTimerClient beamFnTimerClient, String pTransformId, PTransform pTransform, Supplier processBundleInstructionId, Review comment: This test is `testStateCallsFailIfNoStateApiServiceDescriptorSpecified`. Is there value in a `testTimerCallsFailIfNoTimerApiServiceDescriptorSpecified` to exercise the new "Timers are unsupported because ..." exception? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 418956) Time Spent: 15h (was: 14h 50m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 15h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418955&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418955 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 08/Apr/20 22:53 Start Date: 08/Apr/20 22:53 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405855696 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java ## @@ -238,9 +238,14 @@ protected BatchDataflowWorker( sdkFusedStage = pipeline == null ? RegisterNodeFunction.withoutPipeline( -idGenerator, sdkHarnessRegistry.beamFnStateApiServiceDescriptor()) +idGenerator, +sdkHarnessRegistry.beamFnStateApiServiceDescriptor(), +sdkHarnessRegistry.beamFnDataApiServiceDescriptor()) Review comment: Isn't the timer API service descriptor different from the data API service descriptor? Does that need to be plumbed through SdkHarnessRegistry and used here instead of the data API descriptor? (same question below and in streaming worker) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 418955) Time Spent: 15h (was: 14h 50m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 15h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418954&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418954 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 08/Apr/20 22:53 Start Date: 08/Apr/20 22:53 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405831920 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ## @@ -257,6 +230,19 @@ public static ParDoPayload translateParDo( restrictionCoderId = ""; } +Coder windowCoder = +(Coder) mainInput.getWindowingStrategy().getWindowFn().windowCoder(); +Coder keyCoder; +if (signature.usesState() || signature.usesTimers()) { + checkArgument( + mainInput.getCoder() instanceof KvCoder, + "DoFn's that use state or timers must have an input PCollection with a KvCoder but received %s", + mainInput.getCoder()); Review comment: Just curious: did we not have this check before, and just failed when attempting to cast to KVCoder (in the removed block from `translate` above)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 418954) Time Spent: 15h (was: 14h 50m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 15h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418950&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418950 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 08/Apr/20 22:35 Start Date: 08/Apr/20 22:35 Worklog Time Spent: 10m Work Description: pabloem commented on issue #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#issuecomment-611229498 I'm sorry. I am havinhg a heavy headache. I'll bow out. @robertwb can you review fn_runner.py and siblings? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 418950) Time Spent: 14h 50m (was: 14h 40m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 14h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418907&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418907 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 08/Apr/20 21:33 Start Date: 08/Apr/20 21:33 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405815160 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -562,45 +562,45 @@ class OutputTimer(object): def __init__(self, key, window, # type: windowed_value.BoundedWindow - receiver # type: operations.ConsumerSet + paneinfo, + timer_family_id, + timer_coder_impl, + output_stream Review comment: A type on this parameter would be useful. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 418907) Time Spent: 14h (was: 13h 50m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 14h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418908&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418908 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 08/Apr/20 21:33 Start Date: 08/Apr/20 21:33 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405817899 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -611,7 +611,7 @@ def __init__(self, transform_id, # type: str key_coder, # type: coders.Coder window_coder, # type: coders.Coder - timer_family_specs # type: Mapping[str, beam_runner_api_pb2.TimerFamilySpec] + timer_coders Review comment: type? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 418908) Time Spent: 14h 10m (was: 14h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 14h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418913&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418913 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 08/Apr/20 21:33 Start Date: 08/Apr/20 21:33 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405825392 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -846,6 +870,30 @@ def process_bundle(self, instruction_id): data_channels[input_op.data_channel].append(input_op.transform_id) input_op_by_transform_id[input_op.transform_id] = input_op +# Set up timer output stream + timer_output_streams = {} + for transform_id, timer_list in self.timer_ids.items(): +output_streams = {} +for timer_id in timer_list: + output_streams[ + timer_id] = self.timer_data_channel.output_timer_stream( + instruction_id, transform_id, timer_id) + timer_output_streams[transform_id] = output_streams +self.process_timer_ops[ +transform_id].user_state_context.update_timer_output_streams( Review comment: Nit: rather than this double nesting, it might simplify things to have an `update_timer_output_streams(timer_id, output_stream)` method that could be called repeatedly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 418913) Time Spent: 14h 40m (was: 14.5h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 14h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418909&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418909 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 08/Apr/20 21:33 Start Date: 08/Apr/20 21:33 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405817317 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -562,45 +562,45 @@ class OutputTimer(object): def __init__(self, key, window, # type: windowed_value.BoundedWindow - receiver # type: operations.ConsumerSet + paneinfo, + timer_family_id, + timer_coder_impl, + output_stream ): self._key = key self._window = window -self._receiver = receiver +self._paneinfo = paneinfo +self._timer_family_id = timer_family_id +self._output_stream = output_stream +self._timer_coder_impl = timer_coder_impl def set(self, ts): ts = timestamp.Timestamp.of(ts) -# TODO(BEAM-9562): Plumb through actual timer fields. -self._receiver.receive( -windowed_value.WindowedValue(( -self._key, -userstate.Timer( -user_key='', -dynamic_timer_tag='', -windows=(self._window, ), -clear_bit=False, -fire_timestamp=ts, -hold_timestamp=ts, -paneinfo=windowed_value.PANE_INFO_UNKNOWN)), - ts, (self._window, ))) +timer = userstate.Timer( +user_key=self._key, +dynamic_timer_tag='', +windows=(self._window, ), +clear_bit=False, +fire_timestamp=ts, +hold_timestamp=ts, +paneinfo=self._paneinfo) +self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True) +self._output_stream.maybe_flush() def clear(self): # type: () -> None dummy_millis = int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) + 1 clear_ts = timestamp.Timestamp(micros=dummy_millis * 1000) -# TODO(BEAM-9562): Plumb through actual paneinfo. -self._receiver.receive( -windowed_value.WindowedValue(( -self._key, -userstate.Timer( -user_key='', -dynamic_timer_tag='', -windows=(self._window, ), -clear_bit=False, -fire_timestamp=timestamp.Timestamp.of(clear_ts), -hold_timestamp=timestamp.Timestamp.of(0), -paneinfo=windowed_value.PANE_INFO_UNKNOWN)), - 0, (self._window, ))) +timer = userstate.Timer( +user_key=self._key, +dynamic_timer_tag='', +windows=(self._window, ), +clear_bit=False, +fire_timestamp=clear_ts, Review comment: (Should the coder be ignoring them as well?) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 418909) Time Spent: 14h 10m (was: 14h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 14h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418912&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418912 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 08/Apr/20 21:33 Start Date: 08/Apr/20 21:33 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405826837 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -846,6 +870,30 @@ def process_bundle(self, instruction_id): data_channels[input_op.data_channel].append(input_op.transform_id) input_op_by_transform_id[input_op.transform_id] = input_op +# Set up timer output stream + timer_output_streams = {} + for transform_id, timer_list in self.timer_ids.items(): +output_streams = {} +for timer_id in timer_list: + output_streams[ + timer_id] = self.timer_data_channel.output_timer_stream( + instruction_id, transform_id, timer_id) + timer_output_streams[transform_id] = output_streams +self.process_timer_ops[ +transform_id].user_state_context.update_timer_output_streams( +output_streams) + + # Process timers + if self.timer_data_channel: Review comment: We can't safely assume the runner will finish sending all timers before sending any of the data (and the buffer may get full, resulting in a deadlock). I think we need to have a data_channel.inputs() that returns both data and timers and then branch in the loop. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 418912) Time Spent: 14.5h (was: 14h 20m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 14.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418910&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418910 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 08/Apr/20 21:33 Start Date: 08/Apr/20 21:33 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405816460 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -562,45 +562,45 @@ class OutputTimer(object): def __init__(self, key, window, # type: windowed_value.BoundedWindow - receiver # type: operations.ConsumerSet + paneinfo, + timer_family_id, + timer_coder_impl, + output_stream ): self._key = key self._window = window -self._receiver = receiver +self._paneinfo = paneinfo +self._timer_family_id = timer_family_id +self._output_stream = output_stream +self._timer_coder_impl = timer_coder_impl def set(self, ts): ts = timestamp.Timestamp.of(ts) -# TODO(BEAM-9562): Plumb through actual timer fields. -self._receiver.receive( -windowed_value.WindowedValue(( -self._key, -userstate.Timer( -user_key='', -dynamic_timer_tag='', -windows=(self._window, ), -clear_bit=False, -fire_timestamp=ts, -hold_timestamp=ts, -paneinfo=windowed_value.PANE_INFO_UNKNOWN)), - ts, (self._window, ))) +timer = userstate.Timer( +user_key=self._key, +dynamic_timer_tag='', +windows=(self._window, ), +clear_bit=False, +fire_timestamp=ts, +hold_timestamp=ts, +paneinfo=self._paneinfo) +self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True) +self._output_stream.maybe_flush() def clear(self): # type: () -> None dummy_millis = int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) + 1 clear_ts = timestamp.Timestamp(micros=dummy_millis * 1000) -# TODO(BEAM-9562): Plumb through actual paneinfo. -self._receiver.receive( -windowed_value.WindowedValue(( -self._key, -userstate.Timer( -user_key='', -dynamic_timer_tag='', -windows=(self._window, ), -clear_bit=False, -fire_timestamp=timestamp.Timestamp.of(clear_ts), -hold_timestamp=timestamp.Timestamp.of(0), -paneinfo=windowed_value.PANE_INFO_UNKNOWN)), - 0, (self._window, ))) +timer = userstate.Timer( +user_key=self._key, +dynamic_timer_tag='', +windows=(self._window, ), +clear_bit=False, +fire_timestamp=clear_ts, Review comment: Don't bother setting these timestamps, or paneinfo. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 418910) Time Spent: 14h 20m (was: 14h 10m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 14h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=418911&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-418911 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 08/Apr/20 21:33 Start Date: 08/Apr/20 21:33 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11314: [BEAM-9562] Send Timers over Data Channel as Elements URL: https://github.com/apache/beam/pull/11314#discussion_r405824290 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -1088,6 +1142,30 @@ def create_operation(self, transform_proto.spec.payload, parameter_type) return creator(self, transform_id, transform_proto, payload, consumers) + def get_timer_coders(self): +timer_coder = {} +for transform_id, transform_proto in self.descriptor.transforms.items(): Review comment: I see us doing this loop three times now. Perhaps it would be more useful to do the loop once to set everything up, creating a single dictionary (transform_id, timer_family_id) -> (all info about that timer we need to dispatch them). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 418911) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness, sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Fix For: 2.21.0 > > Time Spent: 14h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415580&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415580 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 03/Apr/20 16:33 Start Date: 03/Apr/20 16:33 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#issuecomment-608539461 All tests passed. i'm going to merge it. Thanks, everyone! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 415580) Time Spent: 13h 40m (was: 13.5h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 13h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415582&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415582 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 03/Apr/20 16:33 Start Date: 03/Apr/20 16:33 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 415582) Time Spent: 13h 50m (was: 13h 40m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 13h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415558&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415558 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 03/Apr/20 15:48 Start Date: 03/Apr/20 15:48 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r403099609 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java ## @@ -40,92 +52,201 @@ @AutoValue public abstract class Timer { - /** Returns a timer for the given timestamp with a {@code null} payload. */ - public static Timer of(Instant time) { -return of(time, (Void) null); + /** + * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code + * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}. + */ + public static Timer of( + T userKey, + String dynamicTimerTag, + Collection windows, + Instant fireTimestamp, + Instant holdTimestamp, + PaneInfo pane) { +return new AutoValue_Timer( +userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane); } - /** Returns a timer for the given timestamp with a user specified payload. */ - public static Timer of(Instant timestamp, @Nullable T payload) { -return new AutoValue_Timer(timestamp, payload); + /** Returns a cleared timer for the given userKey, dynamicTimerTag and windows. */ + public static Timer cleared( + T userKey, String dynamicTimerTag, Collection windows) { +return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null); } + /** Returns the key that the timer is set on. */ + public abstract T getUserKey(); + + /** + * Returns the tag that the timer is set on. This tag is {@code ""} when the timer is for a {@link + * TimerSpec}. + */ + public abstract String getDynamicTimerTag(); + + /** Returns the windows which is associated with the timer. */ + public abstract Collection getWindows(); + + /** Returns whether the timer is going to be cleared. */ + public abstract boolean getClearBit(); + /** - * Returns the timestamp of when the timer is scheduled to fire. + * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when + * the timer is being cleared. * - * The time is relative to the time domain defined in the {@link + * The time is absolute to the time domain defined in the {@link * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this * timer. */ - public abstract Instant getTimestamp(); + @Nullable + public abstract Instant getFireTimestamp(); - /** A user supplied payload. */ + /** + * Returns the watermark that the timer is supposed to be held. This field is nullable only when + * the timer is being cleared. + */ @Nullable - public abstract T getPayload(); + public abstract Instant getHoldTimestamp(); + + /** + * Returns the paneinfo that is related to the timer. This field is nullable only when the + * timer is being cleared. + */ + @Nullable + public abstract PaneInfo getPane(); + + @Override + public boolean equals(Object other) { +if (!(other instanceof Timer)) { + return false; +} +Timer that = (Timer) other; +if (this.getClearBit()) { + return Objects.equals(this.getUserKey(), that.getUserKey()) + && this.getDynamicTimerTag().equals(that.getDynamicTimerTag()) + && (this.getClearBit() == that.getClearBit()) + && Objects.equals(this.getWindows(), that.getWindows()); +} +return Objects.equals(this.getUserKey(), that.getUserKey()) +&& this.getDynamicTimerTag().equals(that.getDynamicTimerTag()) +&& (this.getClearBit() == that.getClearBit()) +&& this.getFireTimestamp().equals(that.getFireTimestamp()) +&& this.getHoldTimestamp().equals(that.getHoldTimestamp()) +&& Objects.equals(this.getWindows(), that.getWindows()) +&& Objects.equals(this.getPane(), that.getPane()); + } + + @Override + public int hashCode() { +// Hash only the millis of the timestamp to be consistent with equals +if (getClearBit()) { + return Objects.hash(getUserKey(), getDynamicTimerTag(), getClearBit(), getWindows()); +} +return Objects.hash( +getUserKey(), +getDynamicTimerTag(), +getClearBit(), +getFireTimestamp().getMillis(), +getHoldTimestamp().getMillis(), +getWindows(), +getPane()); Review comment: good point This is
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415322&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415322 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 03/Apr/20 05:44 Start Date: 03/Apr/20 05:44 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#issuecomment-608241577 Run Java_Examples_Dataflow PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 415322) Time Spent: 13h 20m (was: 13h 10m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 13h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415313&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415313 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 03/Apr/20 04:57 Start Date: 03/Apr/20 04:57 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#issuecomment-608229553 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 415313) Time Spent: 13h 10m (was: 13h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 13h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415298&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415298 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 03/Apr/20 03:13 Start Date: 03/Apr/20 03:13 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r402713697 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java ## @@ -40,92 +52,201 @@ @AutoValue public abstract class Timer { - /** Returns a timer for the given timestamp with a {@code null} payload. */ - public static Timer of(Instant time) { -return of(time, (Void) null); + /** + * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code + * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}. + */ + public static Timer of( + T userKey, + String dynamicTimerTag, + Collection windows, + Instant fireTimestamp, + Instant holdTimestamp, + PaneInfo pane) { +return new AutoValue_Timer( +userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane); } - /** Returns a timer for the given timestamp with a user specified payload. */ - public static Timer of(Instant timestamp, @Nullable T payload) { -return new AutoValue_Timer(timestamp, payload); + /** Returns a cleared timer for the given userKey, dynamicTimerTag and windows. */ + public static Timer cleared( + T userKey, String dynamicTimerTag, Collection windows) { +return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null); } + /** Returns the key that the timer is set on. */ + public abstract T getUserKey(); + + /** + * Returns the tag that the timer is set on. This tag is {@code ""} when the timer is for a {@link + * TimerSpec}. + */ + public abstract String getDynamicTimerTag(); + + /** Returns the windows which is associated with the timer. */ + public abstract Collection getWindows(); + + /** Returns whether the timer is going to be cleared. */ + public abstract boolean getClearBit(); + /** - * Returns the timestamp of when the timer is scheduled to fire. + * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when + * the timer is being cleared. * - * The time is relative to the time domain defined in the {@link + * The time is absolute to the time domain defined in the {@link * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this * timer. */ - public abstract Instant getTimestamp(); + @Nullable + public abstract Instant getFireTimestamp(); - /** A user supplied payload. */ + /** + * Returns the watermark that the timer is supposed to be held. This field is nullable only when + * the timer is being cleared. + */ @Nullable - public abstract T getPayload(); + public abstract Instant getHoldTimestamp(); + + /** + * Returns the paneinfo that is related to the timer. This field is nullable only when the + * timer is being cleared. + */ + @Nullable + public abstract PaneInfo getPane(); + + @Override + public boolean equals(Object other) { +if (!(other instanceof Timer)) { + return false; +} +Timer that = (Timer) other; +if (this.getClearBit()) { + return Objects.equals(this.getUserKey(), that.getUserKey()) + && this.getDynamicTimerTag().equals(that.getDynamicTimerTag()) + && (this.getClearBit() == that.getClearBit()) + && Objects.equals(this.getWindows(), that.getWindows()); +} +return Objects.equals(this.getUserKey(), that.getUserKey()) +&& this.getDynamicTimerTag().equals(that.getDynamicTimerTag()) +&& (this.getClearBit() == that.getClearBit()) +&& this.getFireTimestamp().equals(that.getFireTimestamp()) +&& this.getHoldTimestamp().equals(that.getHoldTimestamp()) +&& Objects.equals(this.getWindows(), that.getWindows()) +&& Objects.equals(this.getPane(), that.getPane()); + } + + @Override + public int hashCode() { +// Hash only the millis of the timestamp to be consistent with equals +if (getClearBit()) { + return Objects.hash(getUserKey(), getDynamicTimerTag(), getClearBit(), getWindows()); +} +return Objects.hash( +getUserKey(), +getDynamicTimerTag(), +getClearBit(), +getFireTimestamp().getMillis(), +getHoldTimestamp().getMillis(), +getWindows(), +getPane()); Review comment: `getFireTimestamp()` returns `null` when `clearBit` is `true`. I think we still need
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415274&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415274 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 03/Apr/20 01:55 Start Date: 03/Apr/20 01:55 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r402692210 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java ## @@ -40,92 +52,201 @@ @AutoValue public abstract class Timer { - /** Returns a timer for the given timestamp with a {@code null} payload. */ - public static Timer of(Instant time) { -return of(time, (Void) null); + /** + * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code + * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}. + */ + public static Timer of( + T userKey, + String dynamicTimerTag, + Collection windows, + Instant fireTimestamp, + Instant holdTimestamp, + PaneInfo pane) { +return new AutoValue_Timer( +userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane); } - /** Returns a timer for the given timestamp with a user specified payload. */ - public static Timer of(Instant timestamp, @Nullable T payload) { -return new AutoValue_Timer(timestamp, payload); + /** Returns a cleared timer for the given userKey, dynamicTimerTag and windows. */ + public static Timer cleared( + T userKey, String dynamicTimerTag, Collection windows) { +return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null); } + /** Returns the key that the timer is set on. */ + public abstract T getUserKey(); + + /** + * Returns the tag that the timer is set on. This tag is {@code ""} when the timer is for a {@link + * TimerSpec}. + */ + public abstract String getDynamicTimerTag(); + + /** Returns the windows which is associated with the timer. */ Review comment: ```suggestion /** Returns the windows which are associated with the timer. */ ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 415274) Time Spent: 12h 50m (was: 12h 40m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 12h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415267&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415267 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 03/Apr/20 01:55 Start Date: 03/Apr/20 01:55 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r402688089 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java ## @@ -40,92 +52,201 @@ @AutoValue public abstract class Timer { - /** Returns a timer for the given timestamp with a {@code null} payload. */ - public static Timer of(Instant time) { -return of(time, (Void) null); + /** + * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code + * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}. + */ + public static Timer of( + T userKey, + String dynamicTimerTag, + Collection windows, + Instant fireTimestamp, + Instant holdTimestamp, + PaneInfo pane) { +return new AutoValue_Timer( +userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane); } - /** Returns a timer for the given timestamp with a user specified payload. */ - public static Timer of(Instant timestamp, @Nullable T payload) { -return new AutoValue_Timer(timestamp, payload); + /** Returns a cleared timer for the given userKey, dynamicTimerTag and windows. */ + public static Timer cleared( + T userKey, String dynamicTimerTag, Collection windows) { +return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null); } + /** Returns the key that the timer is set on. */ + public abstract T getUserKey(); + + /** + * Returns the tag that the timer is set on. This tag is {@code ""} when the timer is for a {@link + * TimerSpec}. + */ + public abstract String getDynamicTimerTag(); + + /** Returns the windows which is associated with the timer. */ + public abstract Collection getWindows(); + + /** Returns whether the timer is going to be cleared. */ + public abstract boolean getClearBit(); + /** - * Returns the timestamp of when the timer is scheduled to fire. + * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when + * the timer is being cleared. * - * The time is relative to the time domain defined in the {@link + * The time is absolute to the time domain defined in the {@link * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this * timer. */ - public abstract Instant getTimestamp(); + @Nullable + public abstract Instant getFireTimestamp(); - /** A user supplied payload. */ + /** + * Returns the watermark that the timer is supposed to be held. This field is nullable only when + * the timer is being cleared. + */ @Nullable - public abstract T getPayload(); + public abstract Instant getHoldTimestamp(); + + /** + * Returns the paneinfo that is related to the timer. This field is nullable only when the + * timer is being cleared. Review comment: ```suggestion * Returns the {@link PaneInfo} that is related to the timer. This field is nullable only when the * timer is being cleared. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 415267) Time Spent: 12h 10m (was: 12h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 12h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415265&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415265 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 03/Apr/20 01:55 Start Date: 03/Apr/20 01:55 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r402687712 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java ## @@ -21,17 +21,29 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Objects; import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.BooleanCoder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CollectionCoder; import org.apache.beam.sdk.coders.InstantCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.StructuredCoder; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.joda.time.Instant; /** - * A timer consists of a timestamp and a corresponding user supplied payload. + * A timer consists of a user key, a dynamic timer tag and either a bit that says that this timer + * should be cleared or data representing the firing timestamp, hold timestamp and a list of windows + * and pane information that should be used when producing output. Review comment: ```suggestion * A timer consists of a user key, a dynamic timer tag, a set of windows and either a bit that says that this timer * should be cleared or data representing the firing timestamp, hold timestamp and pane information that should be used when producing output. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 415265) Time Spent: 12h (was: 11h 50m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 12h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415271&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415271 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 03/Apr/20 01:55 Start Date: 03/Apr/20 01:55 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r402689183 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java ## @@ -40,92 +52,201 @@ @AutoValue public abstract class Timer { - /** Returns a timer for the given timestamp with a {@code null} payload. */ - public static Timer of(Instant time) { -return of(time, (Void) null); + /** + * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code + * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}. + */ + public static Timer of( + T userKey, + String dynamicTimerTag, + Collection windows, + Instant fireTimestamp, + Instant holdTimestamp, + PaneInfo pane) { +return new AutoValue_Timer( +userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane); } - /** Returns a timer for the given timestamp with a user specified payload. */ - public static Timer of(Instant timestamp, @Nullable T payload) { -return new AutoValue_Timer(timestamp, payload); + /** Returns a cleared timer for the given userKey, dynamicTimerTag and windows. */ + public static Timer cleared( + T userKey, String dynamicTimerTag, Collection windows) { +return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null); } + /** Returns the key that the timer is set on. */ + public abstract T getUserKey(); + + /** + * Returns the tag that the timer is set on. This tag is {@code ""} when the timer is for a {@link + * TimerSpec}. + */ + public abstract String getDynamicTimerTag(); + + /** Returns the windows which is associated with the timer. */ + public abstract Collection getWindows(); + + /** Returns whether the timer is going to be cleared. */ + public abstract boolean getClearBit(); + /** - * Returns the timestamp of when the timer is scheduled to fire. + * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when + * the timer is being cleared. * - * The time is relative to the time domain defined in the {@link + * The time is absolute to the time domain defined in the {@link * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this * timer. */ - public abstract Instant getTimestamp(); + @Nullable + public abstract Instant getFireTimestamp(); - /** A user supplied payload. */ + /** + * Returns the watermark that the timer is supposed to be held. This field is nullable only when + * the timer is being cleared. + */ @Nullable - public abstract T getPayload(); + public abstract Instant getHoldTimestamp(); + + /** + * Returns the paneinfo that is related to the timer. This field is nullable only when the + * timer is being cleared. + */ + @Nullable + public abstract PaneInfo getPane(); + + @Override + public boolean equals(Object other) { +if (!(other instanceof Timer)) { + return false; +} +Timer that = (Timer) other; +if (this.getClearBit()) { + return Objects.equals(this.getUserKey(), that.getUserKey()) + && this.getDynamicTimerTag().equals(that.getDynamicTimerTag()) + && (this.getClearBit() == that.getClearBit()) + && Objects.equals(this.getWindows(), that.getWindows()); +} +return Objects.equals(this.getUserKey(), that.getUserKey()) +&& this.getDynamicTimerTag().equals(that.getDynamicTimerTag()) +&& (this.getClearBit() == that.getClearBit()) +&& this.getFireTimestamp().equals(that.getFireTimestamp()) +&& this.getHoldTimestamp().equals(that.getHoldTimestamp()) +&& Objects.equals(this.getWindows(), that.getWindows()) +&& Objects.equals(this.getPane(), that.getPane()); + } + + @Override + public int hashCode() { +// Hash only the millis of the timestamp to be consistent with equals +if (getClearBit()) { + return Objects.hash(getUserKey(), getDynamicTimerTag(), getClearBit(), getWindows()); +} +return Objects.hash( +getUserKey(), +getDynamicTimerTag(), +getClearBit(), +getFireTimestamp().getMillis(), +getHoldTimestamp().getMillis(), +getWindows(), +getPane()); + } /** * A {@link org.apache.beam.sdk.coders.Coder} for timers. * - * This coder is determi
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415272&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415272 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 03/Apr/20 01:55 Start Date: 03/Apr/20 01:55 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r402688977 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java ## @@ -40,92 +52,201 @@ @AutoValue public abstract class Timer { - /** Returns a timer for the given timestamp with a {@code null} payload. */ - public static Timer of(Instant time) { -return of(time, (Void) null); + /** + * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code + * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}. + */ + public static Timer of( + T userKey, + String dynamicTimerTag, + Collection windows, + Instant fireTimestamp, + Instant holdTimestamp, + PaneInfo pane) { +return new AutoValue_Timer( +userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane); } - /** Returns a timer for the given timestamp with a user specified payload. */ - public static Timer of(Instant timestamp, @Nullable T payload) { -return new AutoValue_Timer(timestamp, payload); + /** Returns a cleared timer for the given userKey, dynamicTimerTag and windows. */ + public static Timer cleared( + T userKey, String dynamicTimerTag, Collection windows) { +return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null); } + /** Returns the key that the timer is set on. */ + public abstract T getUserKey(); + + /** + * Returns the tag that the timer is set on. This tag is {@code ""} when the timer is for a {@link + * TimerSpec}. + */ + public abstract String getDynamicTimerTag(); + + /** Returns the windows which is associated with the timer. */ + public abstract Collection getWindows(); + + /** Returns whether the timer is going to be cleared. */ + public abstract boolean getClearBit(); + /** - * Returns the timestamp of when the timer is scheduled to fire. + * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when + * the timer is being cleared. * - * The time is relative to the time domain defined in the {@link + * The time is absolute to the time domain defined in the {@link * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this * timer. */ - public abstract Instant getTimestamp(); + @Nullable + public abstract Instant getFireTimestamp(); - /** A user supplied payload. */ + /** + * Returns the watermark that the timer is supposed to be held. This field is nullable only when + * the timer is being cleared. + */ @Nullable - public abstract T getPayload(); + public abstract Instant getHoldTimestamp(); + + /** + * Returns the paneinfo that is related to the timer. This field is nullable only when the + * timer is being cleared. + */ + @Nullable + public abstract PaneInfo getPane(); + + @Override + public boolean equals(Object other) { +if (!(other instanceof Timer)) { + return false; +} +Timer that = (Timer) other; +if (this.getClearBit()) { + return Objects.equals(this.getUserKey(), that.getUserKey()) + && this.getDynamicTimerTag().equals(that.getDynamicTimerTag()) + && (this.getClearBit() == that.getClearBit()) + && Objects.equals(this.getWindows(), that.getWindows()); +} +return Objects.equals(this.getUserKey(), that.getUserKey()) +&& this.getDynamicTimerTag().equals(that.getDynamicTimerTag()) +&& (this.getClearBit() == that.getClearBit()) +&& this.getFireTimestamp().equals(that.getFireTimestamp()) +&& this.getHoldTimestamp().equals(that.getHoldTimestamp()) +&& Objects.equals(this.getWindows(), that.getWindows()) +&& Objects.equals(this.getPane(), that.getPane()); + } + + @Override + public int hashCode() { +// Hash only the millis of the timestamp to be consistent with equals +if (getClearBit()) { + return Objects.hash(getUserKey(), getDynamicTimerTag(), getClearBit(), getWindows()); +} Review comment: The hash is still stable will null objects. ```suggestion ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415269&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415269 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 03/Apr/20 01:55 Start Date: 03/Apr/20 01:55 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r402688769 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java ## @@ -40,92 +52,201 @@ @AutoValue public abstract class Timer { - /** Returns a timer for the given timestamp with a {@code null} payload. */ - public static Timer of(Instant time) { -return of(time, (Void) null); + /** + * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code + * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}. + */ + public static Timer of( + T userKey, + String dynamicTimerTag, + Collection windows, + Instant fireTimestamp, + Instant holdTimestamp, + PaneInfo pane) { +return new AutoValue_Timer( +userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane); } - /** Returns a timer for the given timestamp with a user specified payload. */ - public static Timer of(Instant timestamp, @Nullable T payload) { -return new AutoValue_Timer(timestamp, payload); + /** Returns a cleared timer for the given userKey, dynamicTimerTag and windows. */ + public static Timer cleared( + T userKey, String dynamicTimerTag, Collection windows) { +return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null); } + /** Returns the key that the timer is set on. */ + public abstract T getUserKey(); + + /** + * Returns the tag that the timer is set on. This tag is {@code ""} when the timer is for a {@link + * TimerSpec}. + */ + public abstract String getDynamicTimerTag(); + + /** Returns the windows which is associated with the timer. */ + public abstract Collection getWindows(); + + /** Returns whether the timer is going to be cleared. */ + public abstract boolean getClearBit(); + /** - * Returns the timestamp of when the timer is scheduled to fire. + * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when + * the timer is being cleared. * - * The time is relative to the time domain defined in the {@link + * The time is absolute to the time domain defined in the {@link * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this * timer. */ - public abstract Instant getTimestamp(); + @Nullable + public abstract Instant getFireTimestamp(); - /** A user supplied payload. */ + /** + * Returns the watermark that the timer is supposed to be held. This field is nullable only when + * the timer is being cleared. + */ @Nullable - public abstract T getPayload(); + public abstract Instant getHoldTimestamp(); + + /** + * Returns the paneinfo that is related to the timer. This field is nullable only when the + * timer is being cleared. + */ + @Nullable + public abstract PaneInfo getPane(); + + @Override + public boolean equals(Object other) { +if (!(other instanceof Timer)) { + return false; +} +Timer that = (Timer) other; +if (this.getClearBit()) { + return Objects.equals(this.getUserKey(), that.getUserKey()) + && this.getDynamicTimerTag().equals(that.getDynamicTimerTag()) + && (this.getClearBit() == that.getClearBit()) + && Objects.equals(this.getWindows(), that.getWindows()); +} +return Objects.equals(this.getUserKey(), that.getUserKey()) +&& this.getDynamicTimerTag().equals(that.getDynamicTimerTag()) +&& (this.getClearBit() == that.getClearBit()) +&& this.getFireTimestamp().equals(that.getFireTimestamp()) +&& this.getHoldTimestamp().equals(that.getHoldTimestamp()) +&& Objects.equals(this.getWindows(), that.getWindows()) +&& Objects.equals(this.getPane(), that.getPane()); + } Review comment: This will make the method less error prone in case the contract changes. ```suggestion return Objects.equals(this.getUserKey(), that.getUserKey()) && Objects.equals(this.getDynamicTimerTag(), that.getDynamicTimerTag()) && Objects.equals(this.getWindows(), that.getWindows()) && (this.getClearBit() == that.getClearBit()) && Objects.equals(this.getFireTimestamp(), that.getFireTimestamp()) && Objects.equals(this.getHoldTimestamp(), that.getHoldTimestamp()) && Objects.equals(this.get
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415273&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415273 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 03/Apr/20 01:55 Start Date: 03/Apr/20 01:55 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r402692551 ## File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TimerTest.java ## @@ -33,42 +35,128 @@ /** Tests for {@link Timer}. */ @RunWith(JUnit4.class) public class TimerTest { - private static final Instant INSTANT = Instant.now(); + private static final Instant FIREINSTANT = new Instant(123L); + private static final Instant HOLDINSTANT = new Instant(456L); Review comment: ```suggestion private static final Instant FIRE_TIME = new Instant(123L); private static final Instant HOLD_TIME = new Instant(456L); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 415273) Time Spent: 12h 40m (was: 12.5h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 12h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415268&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415268 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 03/Apr/20 01:55 Start Date: 03/Apr/20 01:55 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r402687822 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java ## @@ -40,92 +52,201 @@ @AutoValue public abstract class Timer { - /** Returns a timer for the given timestamp with a {@code null} payload. */ - public static Timer of(Instant time) { -return of(time, (Void) null); + /** + * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code + * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}. + */ + public static Timer of( + T userKey, + String dynamicTimerTag, + Collection windows, + Instant fireTimestamp, + Instant holdTimestamp, + PaneInfo pane) { +return new AutoValue_Timer( +userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane); } - /** Returns a timer for the given timestamp with a user specified payload. */ - public static Timer of(Instant timestamp, @Nullable T payload) { -return new AutoValue_Timer(timestamp, payload); + /** Returns a cleared timer for the given userKey, dynamicTimerTag and windows. */ + public static Timer cleared( + T userKey, String dynamicTimerTag, Collection windows) { +return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null); } + /** Returns the key that the timer is set on. */ + public abstract T getUserKey(); + + /** + * Returns the tag that the timer is set on. This tag is {@code ""} when the timer is for a {@link + * TimerSpec}. Review comment: ```suggestion * Returns the tag that the timer is set on. The tag is {@code ""} when the timer is for a {@link * TimerSpec}. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 415268) Time Spent: 12h 20m (was: 12h 10m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 12h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415266&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415266 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 03/Apr/20 01:55 Start Date: 03/Apr/20 01:55 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r402687607 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java ## @@ -40,92 +52,201 @@ @AutoValue public abstract class Timer { - /** Returns a timer for the given timestamp with a {@code null} payload. */ - public static Timer of(Instant time) { -return of(time, (Void) null); + /** + * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code + * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}. + */ + public static Timer of( + T userKey, + String dynamicTimerTag, + Collection windows, + Instant fireTimestamp, + Instant holdTimestamp, + PaneInfo pane) { +return new AutoValue_Timer( +userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane); } - /** Returns a timer for the given timestamp with a user specified payload. */ - public static Timer of(Instant timestamp, @Nullable T payload) { -return new AutoValue_Timer(timestamp, payload); + /** Returns a cleared timer for the given userKey, dynamicTimerTag and windows. */ Review comment: ```suggestion /** Returns a cleared timer for the given {@code userKey}, {@code dynamicTimerTag} and {@code windows}. */ ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 415266) Time Spent: 12h (was: 11h 50m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 12h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415270&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415270 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 03/Apr/20 01:55 Start Date: 03/Apr/20 01:55 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r402689030 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java ## @@ -40,92 +52,201 @@ @AutoValue public abstract class Timer { - /** Returns a timer for the given timestamp with a {@code null} payload. */ - public static Timer of(Instant time) { -return of(time, (Void) null); + /** + * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code + * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}. + */ + public static Timer of( + T userKey, + String dynamicTimerTag, + Collection windows, + Instant fireTimestamp, + Instant holdTimestamp, + PaneInfo pane) { +return new AutoValue_Timer( +userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane); } - /** Returns a timer for the given timestamp with a user specified payload. */ - public static Timer of(Instant timestamp, @Nullable T payload) { -return new AutoValue_Timer(timestamp, payload); + /** Returns a cleared timer for the given userKey, dynamicTimerTag and windows. */ + public static Timer cleared( + T userKey, String dynamicTimerTag, Collection windows) { +return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null); } + /** Returns the key that the timer is set on. */ + public abstract T getUserKey(); + + /** + * Returns the tag that the timer is set on. This tag is {@code ""} when the timer is for a {@link + * TimerSpec}. + */ + public abstract String getDynamicTimerTag(); + + /** Returns the windows which is associated with the timer. */ + public abstract Collection getWindows(); + + /** Returns whether the timer is going to be cleared. */ + public abstract boolean getClearBit(); + /** - * Returns the timestamp of when the timer is scheduled to fire. + * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when + * the timer is being cleared. * - * The time is relative to the time domain defined in the {@link + * The time is absolute to the time domain defined in the {@link * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this * timer. */ - public abstract Instant getTimestamp(); + @Nullable + public abstract Instant getFireTimestamp(); - /** A user supplied payload. */ + /** + * Returns the watermark that the timer is supposed to be held. This field is nullable only when + * the timer is being cleared. + */ @Nullable - public abstract T getPayload(); + public abstract Instant getHoldTimestamp(); + + /** + * Returns the paneinfo that is related to the timer. This field is nullable only when the + * timer is being cleared. + */ + @Nullable + public abstract PaneInfo getPane(); + + @Override + public boolean equals(Object other) { +if (!(other instanceof Timer)) { + return false; +} +Timer that = (Timer) other; +if (this.getClearBit()) { + return Objects.equals(this.getUserKey(), that.getUserKey()) + && this.getDynamicTimerTag().equals(that.getDynamicTimerTag()) + && (this.getClearBit() == that.getClearBit()) + && Objects.equals(this.getWindows(), that.getWindows()); +} +return Objects.equals(this.getUserKey(), that.getUserKey()) +&& this.getDynamicTimerTag().equals(that.getDynamicTimerTag()) +&& (this.getClearBit() == that.getClearBit()) +&& this.getFireTimestamp().equals(that.getFireTimestamp()) +&& this.getHoldTimestamp().equals(that.getHoldTimestamp()) +&& Objects.equals(this.getWindows(), that.getWindows()) +&& Objects.equals(this.getPane(), that.getPane()); + } + + @Override + public int hashCode() { +// Hash only the millis of the timestamp to be consistent with equals +if (getClearBit()) { + return Objects.hash(getUserKey(), getDynamicTimerTag(), getClearBit(), getWindows()); +} +return Objects.hash( +getUserKey(), +getDynamicTimerTag(), +getClearBit(), +getFireTimestamp().getMillis(), +getHoldTimestamp().getMillis(), +getWindows(), +getPane()); Review comment: ```suggestion return Objects.hash( getUserKey(), getDynam
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415253&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415253 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 03/Apr/20 01:22 Start Date: 03/Apr/20 01:22 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r402685419 ## File path: model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml ## @@ -28,6 +28,23 @@ # one of a few standard JSON types such as numbers, strings, dicts that map naturally # to the type encoded by the coder. # +# Java code snippet to generate example bytes: +# Coder> coder = Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE); +# Instant now = new Instant(1000L); +# Timer timer = Timer.of( +# "key", +# "tag", +# now, +# now, +# Collections.singletonList(GlobalWindow.INSTANCE), +# PaneInfo.NO_FIRING); +# byte[] byets = CoderUtils.encodeToByteArray(coder, timer); +# String str = new String(byets, java.nio.charset.StandardCharsets.ISO_8859_1); Review comment: sg This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 415253) Time Spent: 11h 50m (was: 11h 40m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 11h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=415007&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415007 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 02/Apr/20 19:45 Start Date: 02/Apr/20 19:45 Worklog Time Spent: 10m Work Description: lostluck commented on issue #11297: [BEAM-9562] Update missed TimerSpec conversion in Go SDK URL: https://github.com/apache/beam/pull/11297#issuecomment-608066725 We don't that's utility code. It'll need to be fixed up properly for any missing/new fields later after stability is hit, but only for new to the SDK features like Timer families. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 415007) Time Spent: 11h 40m (was: 11.5h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 11h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=414996&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-414996 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 02/Apr/20 19:39 Start Date: 02/Apr/20 19:39 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11297: [BEAM-9562] Update missed TimerSpec conversion in Go SDK URL: https://github.com/apache/beam/pull/11297 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 414996) Time Spent: 11.5h (was: 11h 20m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 11.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)