[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411636&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411636 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 28/Mar/20 06:24 Start Date: 28/Mar/20 06:24 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11252#issuecomment-605403432 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411636) Time Spent: 5h 50m (was: 5h 40m) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 5h 50m > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411633&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411633 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 28/Mar/20 06:01 Start Date: 28/Mar/20 06:01 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11252#issuecomment-605401784 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411633) Time Spent: 5h 40m (was: 5.5h) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 5h 40m > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411631&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411631 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 28/Mar/20 05:56 Start Date: 28/Mar/20 05:56 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11252#issuecomment-605401409 Run PythonFormatter PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411631) Time Spent: 5h 20m (was: 5h 10m) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 5h 20m > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411630&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411630 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 28/Mar/20 05:56 Start Date: 28/Mar/20 05:56 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11252#issuecomment-605401405 Run Java_Examples_Dataflow PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411630) Time Spent: 5h 10m (was: 5h) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 5h 10m > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411629&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411629 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 28/Mar/20 05:56 Start Date: 28/Mar/20 05:56 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11252#issuecomment-605401396 Run CommunityMetrics PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411629) Time Spent: 5h (was: 4h 50m) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 5h > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411632&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411632 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 28/Mar/20 05:56 Start Date: 28/Mar/20 05:56 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11252#issuecomment-605401420 Run PythonLint PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411632) Time Spent: 5.5h (was: 5h 20m) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 5.5h > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411628&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411628 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 28/Mar/20 05:54 Start Date: 28/Mar/20 05:54 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11252#issuecomment-605401312 Run Website_Stage_GCS PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411628) Time Spent: 4h 50m (was: 4h 40m) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 4h 50m > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411624&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411624 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 28/Mar/20 05:54 Start Date: 28/Mar/20 05:54 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11252#issuecomment-605401231 Run JavaPortabilityApi PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411624) Time Spent: 4h 10m (was: 4h) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 4h 10m > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411626&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411626 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 28/Mar/20 05:54 Start Date: 28/Mar/20 05:54 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11252#issuecomment-605401262 Run JavaBeamZetaSQL PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411626) Time Spent: 4.5h (was: 4h 20m) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 4.5h > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411625&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411625 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 28/Mar/20 05:54 Start Date: 28/Mar/20 05:54 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11252#issuecomment-605401246 Run Go PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411625) Time Spent: 4h 20m (was: 4h 10m) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 4h 20m > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411627&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411627 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 28/Mar/20 05:54 Start Date: 28/Mar/20 05:54 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11252#issuecomment-605401285 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411627) Time Spent: 4h 40m (was: 4.5h) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 4h 40m > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411623&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411623 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 28/Mar/20 05:53 Start Date: 28/Mar/20 05:53 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11252#issuecomment-605401221 Run Website PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411623) Time Spent: 4h (was: 3h 50m) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 4h > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9198) BeamSQL aggregation analytics functionality
[ https://issues.apache.org/jira/browse/BEAM-9198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Wang updated BEAM-9198: --- Description: Mentor email: ruw...@google.com. Feel free to send emails for your questions. Project Information - BeamSQL has a long list of of aggregation/aggregation analytics functionalities to support. To begin with, you will need to support this syntax: {code:sql} analytic_function_name ( [ argument_list ] ) OVER ( [ PARTITION BY partition_expression_list ] [ ORDER BY expression [{ ASC | DESC }] [, ...] ] [ window_frame_clause ] ) {code} As there is a long list of analytics functions, a good start point is support rank() first. This will requires touch core components of BeamSQL: 1. SQL parser to support the syntax above. 2. SQL core to implement physical relational operator. 3. Distributed algorithms to implement a list of functions in a distributed manner. 4. Enable in ZetaSQL dialect. To understand what SQL analytics functionality is, you could check this great explanation doc: https://cloud.google.com/bigquery/docs/reference/standard-sql/analytic-function-concepts. To know about Beam's programming model, check: https://beam.apache.org/documentation/programming-guide/#overview was: Mentor email: ruw...@google.com. Feel free to send emails for your questions. Project Information - BeamSQL has a long list of of aggregation/aggregation analytics functionalities to support. To begin with, you will need to support this syntax: {code:sql} analytic_function_name ( [ argument_list ] ) OVER ( [ PARTITION BY partition_expression_list ] [ ORDER BY expression [{ ASC | DESC }] [, ...] ] [ window_frame_clause ] ) {code} As there is a long list of analytics functions, a good start point is support rank() first. This will requires touch core components of BeamSQL: 1. SQL parser to support the syntax above. 2. SQL core to implement physical relational operator. 3. Distributed algorithms to implement a list of functions in a distributed manner. 4. Build benchmarks to measure performance of your implementation. To understand what SQL analytics functionality is, you could check this great explanation doc: https://cloud.google.com/bigquery/docs/reference/standard-sql/analytic-function-concepts. To know about Beam's programming model, check: https://beam.apache.org/documentation/programming-guide/#overview > BeamSQL aggregation analytics functionality > > > Key: BEAM-9198 > URL: https://issues.apache.org/jira/browse/BEAM-9198 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Rui Wang >Priority: Major > Labels: gsoc, gsoc2020, mentor > > Mentor email: ruw...@google.com. Feel free to send emails for your questions. > Project Information > - > BeamSQL has a long list of of aggregation/aggregation analytics > functionalities to support. > To begin with, you will need to support this syntax: > {code:sql} > analytic_function_name ( [ argument_list ] ) > OVER ( > [ PARTITION BY partition_expression_list ] > [ ORDER BY expression [{ ASC | DESC }] [, ...] ] > [ window_frame_clause ] > ) > {code} > As there is a long list of analytics functions, a good start point is support > rank() first. > This will requires touch core components of BeamSQL: > 1. SQL parser to support the syntax above. > 2. SQL core to implement physical relational operator. > 3. Distributed algorithms to implement a list of functions in a distributed > manner. > 4. Enable in ZetaSQL dialect. > To understand what SQL analytics functionality is, you could check this great > explanation doc: > https://cloud.google.com/bigquery/docs/reference/standard-sql/analytic-function-concepts. > To know about Beam's programming model, check: > https://beam.apache.org/documentation/programming-guide/#overview -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411622&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411622 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 28/Mar/20 05:47 Start Date: 28/Mar/20 05:47 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11252#issuecomment-605400710 Run SQL PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411622) Time Spent: 3h 50m (was: 3h 40m) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 3h 50m > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-4374) Update existing metrics in the FN API to use new Metric Schema
[ https://issues.apache.org/jira/browse/BEAM-4374?focusedWorklogId=411615&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411615 ] ASF GitHub Bot logged work on BEAM-4374: Author: ASF GitHub Bot Created on: 28/Mar/20 04:55 Start Date: 28/Mar/20 04:55 Worklog Time Spent: 10m Work Description: lostluck commented on issue #11231: [BEAM-4374] Shortids for the Go SDK URL: https://github.com/apache/beam/pull/11231#issuecomment-605395102 R: @youngoli @lukecwik I can't wait to get rid of both of the legacy stuff and the older monitoring info listing. That function is becoming crufty. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411615) Time Spent: 35h 20m (was: 35h 10m) > Update existing metrics in the FN API to use new Metric Schema > -- > > Key: BEAM-4374 > URL: https://issues.apache.org/jira/browse/BEAM-4374 > Project: Beam > Issue Type: New Feature > Components: beam-model >Reporter: Alex Amato >Priority: Major > Time Spent: 35h 20m > Remaining Estimate: 0h > > Update existing metrics to use the new proto and cataloging schema defined in: > [_https://s.apache.org/beam-fn-api-metrics_] > * Check in new protos > * Define catalog file for metrics > * Port existing metrics to use this new format, based on catalog > names+metadata -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-4374) Update existing metrics in the FN API to use new Metric Schema
[ https://issues.apache.org/jira/browse/BEAM-4374?focusedWorklogId=411614&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411614 ] ASF GitHub Bot logged work on BEAM-4374: Author: ASF GitHub Bot Created on: 28/Mar/20 04:55 Start Date: 28/Mar/20 04:55 Worklog Time Spent: 10m Work Description: lostluck commented on issue #11231: [BEAM-4374] Shortids for the Go SDK URL: https://github.com/apache/beam/pull/11231#issuecomment-605395052 Run Go Postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411614) Time Spent: 35h 10m (was: 35h) > Update existing metrics in the FN API to use new Metric Schema > -- > > Key: BEAM-4374 > URL: https://issues.apache.org/jira/browse/BEAM-4374 > Project: Beam > Issue Type: New Feature > Components: beam-model >Reporter: Alex Amato >Priority: Major > Time Spent: 35h 10m > Remaining Estimate: 0h > > Update existing metrics to use the new proto and cataloging schema defined in: > [_https://s.apache.org/beam-fn-api-metrics_] > * Check in new protos > * Define catalog file for metrics > * Port existing metrics to use this new format, based on catalog > names+metadata -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411603&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411603 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 28/Mar/20 04:17 Start Date: 28/Mar/20 04:17 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11252#issuecomment-605392133 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411603) Time Spent: 3h 40m (was: 3.5h) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 3h 40m > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9136) Add LICENSES and NOTICES to docker images
[ https://issues.apache.org/jira/browse/BEAM-9136?focusedWorklogId=411602&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411602 ] ASF GitHub Bot logged work on BEAM-9136: Author: ASF GitHub Bot Created on: 28/Mar/20 04:00 Start Date: 28/Mar/20 04:00 Worklog Time Spent: 10m Work Description: lostluck commented on pull request #11246: [BEAM-9136]Add licenses for dependencies for Go URL: https://github.com/apache/beam/pull/11246#discussion_r399615711 ## File path: sdks/go/container/license_script.sh ## @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +output_dir=third_party_licenses +# remove output_dir if existing +if [ -d "$output_dir" ]; then rm -rf $output_dir; fi + +# get go-licenses and run +go get github.com/google/go-licenses +$GOPATH/bin/go-licenses save "github.com/apache/beam/sdks/go/pkg/beam/" --save_path="$output_dir" Review comment: The Go SDK isn't currently up on all the best practices for Go. In particular, what you're running on your machine is probably not what Jenkin's is pulling down due to how versioning isn't being handled at present. It's not currently a priority to spend effort to resolve this state of affairs. It will not happen until after SDF and Beam Schema's have been added to the SDK later this year. I do not recommend spending time *now* to do this, but please keep sufficient notes so that we can resolve the Go situation later on. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411602) Time Spent: 10h 20m (was: 10h 10m) > Add LICENSES and NOTICES to docker images > - > > Key: BEAM-9136 > URL: https://issues.apache.org/jira/browse/BEAM-9136 > Project: Beam > Issue Type: Task > Components: build-system >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Time Spent: 10h 20m > Remaining Estimate: 0h > > Scan dependencies and add licenses and notices of the dependencies to SDK > docker images. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9136) Add LICENSES and NOTICES to docker images
[ https://issues.apache.org/jira/browse/BEAM-9136?focusedWorklogId=411596&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411596 ] ASF GitHub Bot logged work on BEAM-9136: Author: ASF GitHub Bot Created on: 28/Mar/20 03:35 Start Date: 28/Mar/20 03:35 Worklog Time Spent: 10m Work Description: tvalentyn commented on pull request #11246: [BEAM-9136]Add licenses for dependencies for Go URL: https://github.com/apache/beam/pull/11246#discussion_r399613724 ## File path: sdks/go/container/license_script.sh ## @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +output_dir=third_party_licenses +# remove output_dir if existing +if [ -d "$output_dir" ]; then rm -rf $output_dir; fi + +# get go-licenses and run +go get github.com/google/go-licenses +$GOPATH/bin/go-licenses save "github.com/apache/beam/sdks/go/pkg/beam/" --save_path="$output_dir" Review comment: cc: @lostluck This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411596) Time Spent: 10h 10m (was: 10h) > Add LICENSES and NOTICES to docker images > - > > Key: BEAM-9136 > URL: https://issues.apache.org/jira/browse/BEAM-9136 > Project: Beam > Issue Type: Task > Components: build-system >Reporter: Hannah Jiang >Assignee: Hannah Jiang >Priority: Major > Time Spent: 10h 10m > Remaining Estimate: 0h > > Scan dependencies and add licenses and notices of the dependencies to SDK > docker images. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411590&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411590 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 28/Mar/20 02:37 Start Date: 28/Mar/20 02:37 Worklog Time Spent: 10m Work Description: reuvenlax commented on issue #11226: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11226#issuecomment-605383329 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411590) Time Spent: 3.5h (was: 3h 20m) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 3.5h > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9446) FlinkRunner discards parallelism and execution_mode_for_batch pipeline options
[ https://issues.apache.org/jira/browse/BEAM-9446?focusedWorklogId=411589&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411589 ] ASF GitHub Bot logged work on BEAM-9446: Author: ASF GitHub Bot Created on: 28/Mar/20 01:54 Start Date: 28/Mar/20 01:54 Worklog Time Spent: 10m Work Description: ibzib commented on pull request #11189: [BEAM-9446] Retain unknown arguments when using uber jar job server. URL: https://github.com/apache/beam/pull/11189#discussion_r399604848 ## File path: sdks/python/apache_beam/options/pipeline_options.py ## @@ -285,10 +289,29 @@ def get_all_options( cls._add_argparse_args(parser) # pylint: disable=protected-access if add_extra_args_fn: add_extra_args_fn(parser) + known_args, unknown_args = parser.parse_known_args(self._flags) -if unknown_args: - _LOGGER.warning("Discarding unparseable args: %s", unknown_args) -result = vars(known_args) +if retain_unknown_options: + i = 0 + while i < len(unknown_args): +# Treat all unary flags as booleans, and all binary argument values as +# strings. +if i + 1 >= len(unknown_args) or unknown_args[i + 1].startswith('-'): + split = unknown_args[i].rsplit('=') Review comment: Oh, I see what you mean now. I will commit your suggestion to avoid confusion. However, I don't think we should support unrecognized `append` arguments. I don't want to have to infer complex argument types. As for the specific case of `experiments`, that will not be a problem because `experiments` is already recognized by the parser. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411589) Time Spent: 5h 10m (was: 5h) > FlinkRunner discards parallelism and execution_mode_for_batch pipeline options > -- > > Key: BEAM-9446 > URL: https://issues.apache.org/jira/browse/BEAM-9446 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Kyle Weaver >Assignee: Kyle Weaver >Priority: Major > Labels: portability-flink > Fix For: 2.21.0 > > Time Spent: 5h 10m > Remaining Estimate: 0h > > I need these options for TFX, but they're being discarded (I believe they are > normally supplied by the job server). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-4374) Update existing metrics in the FN API to use new Metric Schema
[ https://issues.apache.org/jira/browse/BEAM-4374?focusedWorklogId=411587&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411587 ] ASF GitHub Bot logged work on BEAM-4374: Author: ASF GitHub Bot Created on: 28/Mar/20 01:35 Start Date: 28/Mar/20 01:35 Worklog Time Spent: 10m Work Description: lostluck commented on issue #11231: [BEAM-4374] Shortids for the Go SDK URL: https://github.com/apache/beam/pull/11231#issuecomment-605375733 Run Go Postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411587) Time Spent: 35h (was: 34h 50m) > Update existing metrics in the FN API to use new Metric Schema > -- > > Key: BEAM-4374 > URL: https://issues.apache.org/jira/browse/BEAM-4374 > Project: Beam > Issue Type: New Feature > Components: beam-model >Reporter: Alex Amato >Priority: Major > Time Spent: 35h > Remaining Estimate: 0h > > Update existing metrics to use the new proto and cataloging schema defined in: > [_https://s.apache.org/beam-fn-api-metrics_] > * Check in new protos > * Define catalog file for metrics > * Port existing metrics to use this new format, based on catalog > names+metadata -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-4374) Update existing metrics in the FN API to use new Metric Schema
[ https://issues.apache.org/jira/browse/BEAM-4374?focusedWorklogId=411586&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411586 ] ASF GitHub Bot logged work on BEAM-4374: Author: ASF GitHub Bot Created on: 28/Mar/20 01:34 Start Date: 28/Mar/20 01:34 Worklog Time Spent: 10m Work Description: lostluck commented on pull request #11231: [BEAM-4374] Shortids for the Go SDK URL: https://github.com/apache/beam/pull/11231#discussion_r399602340 ## File path: sdks/go/pkg/beam/core/runtime/harness/monitoring.go ## @@ -16,20 +16,165 @@ package harness import ( + "bytes" + "strconv" + "sync" + "sync/atomic" "time" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/go/pkg/beam/core/metrics" "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec" fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" ppb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" "github.com/golang/protobuf/ptypes" ) -func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo) { +type mUrn uint32 +type mType uint32 + +// TODO: Pull these from the protos. +var sUrns = []string{ + "beam:metric:user:v1", + "beam:metric:element_count:v1", + "beam:metric:pardo_execution_time:start_bundle_msecs:v1", + "beam:metric:pardo_execution_time:process_bundle_msecs:v1", + "beam:metric:pardo_execution_time:finish_bundle_msecs:v1", + "beam:metric:ptransform_progress:remaining:v1", + "beam:metric:ptransform_progress:completed:v1", + + "TestingSentinelUrn", // Must remain last. +} + +const ( + urnUser mUrn = iota + urnElementCount + urnStartBundle + urnProcessBundle + urnFinishBundle + urnProgressRemaining + urnProgressCompleted + + urnTestSentinel // Must remain last. +) + +var sTypes = []string{ + "beam:metrics:sum_int64:v1", + "beam:metrics:sum_double:v1", + "beam:metrics:distribution_int64:v1", + "beam:metrics:distribution_double:v1", + "beam:metrics:latest_int64:v1", + "beam:metrics:latest_double:v1", + "beam:metrics:top_n_int64:v1", + "beam:metrics:top_n_double:v1", + "beam:metrics:bottom_n_int64:v1", + "beam:metrics:bottom_n_double:v1", + "beam:metrics:monitoring_table:v1", + "beam:metrics:progress:v1", + + "TestingSentinelType", // Must remain last. +} + +const ( Review comment: Ack. I've kept the extra marker type around for the moment, but I might collapse things into the function to simplify some thing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411586) Time Spent: 34h 50m (was: 34h 40m) > Update existing metrics in the FN API to use new Metric Schema > -- > > Key: BEAM-4374 > URL: https://issues.apache.org/jira/browse/BEAM-4374 > Project: Beam > Issue Type: New Feature > Components: beam-model >Reporter: Alex Amato >Priority: Major > Time Spent: 34h 50m > Remaining Estimate: 0h > > Update existing metrics to use the new proto and cataloging schema defined in: > [_https://s.apache.org/beam-fn-api-metrics_] > * Check in new protos > * Define catalog file for metrics > * Port existing metrics to use this new format, based on catalog > names+metadata -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9626) pymongo should be an optional requirement
[ https://issues.apache.org/jira/browse/BEAM-9626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17069151#comment-17069151 ] Ahmet Altay commented on BEAM-9626: --- Why do we want to make this change especially retroactively for an IO that is already in the core package? What about the other IOs? I do not think we would like to set a precedent for moving packages out of beam core unless there is a deeper issue. Also specifically for python IOs the growth rate outside of gcp package was not really significant so far. > pymongo should be an optional requirement > - > > Key: BEAM-9626 > URL: https://issues.apache.org/jira/browse/BEAM-9626 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Chad Dombrova >Assignee: Chad Dombrova >Priority: Minor > Time Spent: 0.5h > Remaining Estimate: 0h > > The pymongo driver is installed by default, but as the number of IO > connectors in the python sdk grows, I don't think this is the precedent we > want to set. We already have "extra" packages for gcp, aws, and interactive, > we should also add one for mongo. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9627) KafkaIO needs better support for SSL
Daniel Mills created BEAM-9627: -- Summary: KafkaIO needs better support for SSL Key: BEAM-9627 URL: https://issues.apache.org/jira/browse/BEAM-9627 Project: Beam Issue Type: Improvement Components: io-java-kafka Reporter: Daniel Mills Configuring SSL for kafka requires pointing an option at local files containing keys and roots of trust as described here: [https://www.confluent.io/blog/apache-kafka-security-authorization-authentication-encryption/] Currently, it is somewhat tricky to ensure that these files are written before KafkaIO starts reading from the source; one potential option would be to add an init hook where the user could download keys from the keystore of their choice and write them to local files. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411550&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411550 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 28/Mar/20 00:22 Start Date: 28/Mar/20 00:22 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r399591425 ## File path: sdks/python/apache_beam/coders/coder_impl.py ## @@ -629,22 +629,56 @@ def estimate_size(self, unused_value, nested=False): class TimerCoderImpl(StreamCoderImpl): """For internal use only; no backwards-compatibility guarantees.""" - def __init__(self, payload_coder_impl): + def __init__(self, key_coder_impl, window_coder_impl, tag_coder_impl): Review comment: why do you have to specify the tag_coder_impl, shouldn't it always be Python's string utf8 coder? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411550) Time Spent: 6h 10m (was: 6h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 6h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411558&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411558 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 28/Mar/20 00:22 Start Date: 28/Mar/20 00:22 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r399591742 ## File path: sdks/python/apache_beam/coders/coder_impl.py ## @@ -629,22 +629,56 @@ def estimate_size(self, unused_value, nested=False): class TimerCoderImpl(StreamCoderImpl): """For internal use only; no backwards-compatibility guarantees.""" - def __init__(self, payload_coder_impl): + def __init__(self, key_coder_impl, window_coder_impl, tag_coder_impl): self._timestamp_coder_impl = TimestampCoderImpl() -self._payload_coder_impl = payload_coder_impl +self._boolean_coder_impl = BooleanCoderImpl() +self._pane_info_coder_impl = PaneInfoCoderImpl() +self._key_coder_impl = key_coder_impl +self._windows_coder_impl = TupleSequenceCoderImpl(window_coder_impl) +self._tag_coder_impl = tag_coder_impl def encode_to_stream(self, value, out, nested): # type: (dict, create_OutputStream, bool) -> None -self._timestamp_coder_impl.encode_to_stream(value['timestamp'], out, True) -self._payload_coder_impl.encode_to_stream(value.get('payload'), out, True) +self._key_coder_impl.encode_to_stream(value.user_key, out, nested) +self._tag_coder_impl.encode_to_stream( +value.dynamic_timer_tag, out, True) +self._boolean_coder_impl.encode_to_stream(value.clear_bit, out, True) +if not value.clear_bit: + self._timestamp_coder_impl.encode_to_stream( + value.fire_timestamp, out, True) + self._timestamp_coder_impl.encode_to_stream( + value.hold_timestamp, out, True) + self._windows_coder_impl.encode_to_stream(value.windows, out, True) + self._pane_info_coder_impl.encode_to_stream(value.paneinfo, out, True) def decode_from_stream(self, in_stream, nested): # type: (create_InputStream, bool) -> dict # TODO(robertwb): Consider using a concrete class rather than a dict here. -return dict( -timestamp=self._timestamp_coder_impl.decode_from_stream( +from apache_beam.transforms import userstate +user_key = self._key_coder_impl.decode_from_stream(in_stream, nested) Review comment: ```suggestion user_key = self._key_coder_impl.decode_from_stream(in_stream, True) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411558) Time Spent: 6.5h (was: 6h 20m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 6.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411546&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411546 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 28/Mar/20 00:22 Start Date: 28/Mar/20 00:22 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r399587568 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ## @@ -197,7 +199,7 @@ public boolean canTranslate(PTransform pTransform) { ((KvCoder) mainInput.getCoder()).getKeyCoder(), // TODO: Add support for timer payloads to the SDK // We currently assume that all payloads are unspecified. -Timer.Coder.of(VoidCoder.of(; +Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE))); Review comment: ```suggestion Timer.Coder.of(((KvCoder) mainInput.getCoder()).getKeyCoder(), GlobalWindow.Coder.INSTANCE))); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411546) Time Spent: 5h 50m (was: 5h 40m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 5h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411557&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411557 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 28/Mar/20 00:22 Start Date: 28/Mar/20 00:22 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r399589246 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java ## @@ -73,59 +107,81 @@ */ public static class Coder extends StructuredCoder> { -public static Coder of(org.apache.beam.sdk.coders.Coder payloadCoder) { - return new Coder(payloadCoder); +public static Coder of( +org.apache.beam.sdk.coders.Coder keyCoder, +org.apache.beam.sdk.coders.Coder windowCoder) { + return new Coder<>(keyCoder, windowCoder); } -private final org.apache.beam.sdk.coders.Coder payloadCoder; +private final org.apache.beam.sdk.coders.Coder keyCoder; +private final org.apache.beam.sdk.coders.Coder> +windowsCoder; -private Coder(org.apache.beam.sdk.coders.Coder payloadCoder) { - this.payloadCoder = payloadCoder; +private Coder( +org.apache.beam.sdk.coders.Coder keyCoder, +org.apache.beam.sdk.coders.Coder windowCoder) { + this.keyCoder = keyCoder; + this.windowsCoder = (org.apache.beam.sdk.coders.Coder) CollectionCoder.of(windowCoder); + ; } @Override public void encode(Timer timer, OutputStream outStream) throws CoderException, IOException { - InstantCoder.of().encode(timer.getTimestamp(), outStream); - payloadCoder.encode(timer.getPayload(), outStream); + keyCoder.encode(timer.getUserKey(), outStream); + StringUtf8Coder.of().encode(timer.getDynamicTimerTag(), outStream); + BooleanCoder.of().encode(timer.getClearBit(), outStream); + if (!timer.getClearBit()) { +InstantCoder.of().encode(timer.getFireTimestamp(), outStream); +InstantCoder.of().encode(timer.getHoldTimestamp(), outStream); +windowsCoder.encode(timer.getWindows(), outStream); +PaneInfoCoder.INSTANCE.encode(timer.getPane(), outStream); + } } @Override public Timer decode(InputStream inStream) throws CoderException, IOException { - Instant instant = InstantCoder.of().decode(inStream); - T value = payloadCoder.decode(inStream); - return Timer.of(instant, value); + T userKey = keyCoder.decode(inStream); + String dynamicTimerTag = StringUtf8Coder.of().decode(inStream); + Boolean clearBit = BooleanCoder.of().decode(inStream); Review comment: ```suggestion boolean clearBit = BooleanCoder.of().decode(inStream); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411557) Time Spent: 6h 20m (was: 6h 10m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 6h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411555&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411555 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 28/Mar/20 00:22 Start Date: 28/Mar/20 00:22 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r399591519 ## File path: sdks/python/apache_beam/coders/coder_impl.py ## @@ -629,22 +629,56 @@ def estimate_size(self, unused_value, nested=False): class TimerCoderImpl(StreamCoderImpl): """For internal use only; no backwards-compatibility guarantees.""" - def __init__(self, payload_coder_impl): + def __init__(self, key_coder_impl, window_coder_impl, tag_coder_impl): self._timestamp_coder_impl = TimestampCoderImpl() -self._payload_coder_impl = payload_coder_impl +self._boolean_coder_impl = BooleanCoderImpl() +self._pane_info_coder_impl = PaneInfoCoderImpl() +self._key_coder_impl = key_coder_impl +self._windows_coder_impl = TupleSequenceCoderImpl(window_coder_impl) +self._tag_coder_impl = tag_coder_impl def encode_to_stream(self, value, out, nested): # type: (dict, create_OutputStream, bool) -> None -self._timestamp_coder_impl.encode_to_stream(value['timestamp'], out, True) -self._payload_coder_impl.encode_to_stream(value.get('payload'), out, True) +self._key_coder_impl.encode_to_stream(value.user_key, out, nested) +self._tag_coder_impl.encode_to_stream( +value.dynamic_timer_tag, out, True) +self._boolean_coder_impl.encode_to_stream(value.clear_bit, out, True) +if not value.clear_bit: + self._timestamp_coder_impl.encode_to_stream( + value.fire_timestamp, out, True) + self._timestamp_coder_impl.encode_to_stream( + value.hold_timestamp, out, True) + self._windows_coder_impl.encode_to_stream(value.windows, out, True) + self._pane_info_coder_impl.encode_to_stream(value.paneinfo, out, True) Review comment: ```suggestion self._pane_info_coder_impl.encode_to_stream(value.paneinfo, out, nested) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411555) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 6h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411548&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411548 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 28/Mar/20 00:22 Start Date: 28/Mar/20 00:22 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r399588039 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java ## @@ -40,14 +49,26 @@ @AutoValue public abstract class Timer { - /** Returns a timer for the given timestamp with a {@code null} payload. */ - public static Timer of(Instant time) { -return of(time, (Void) null); + /** + * Returns a timer for the given timestamp with a user specified payload. Review comment: ```suggestion * Returns a timer for the given timestamp with a user specified key. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411548) Time Spent: 6h (was: 5h 50m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 6h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411552&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411552 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 28/Mar/20 00:22 Start Date: 28/Mar/20 00:22 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r399589633 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java ## @@ -73,59 +107,81 @@ */ public static class Coder extends StructuredCoder> { -public static Coder of(org.apache.beam.sdk.coders.Coder payloadCoder) { - return new Coder(payloadCoder); +public static Coder of( +org.apache.beam.sdk.coders.Coder keyCoder, +org.apache.beam.sdk.coders.Coder windowCoder) { + return new Coder<>(keyCoder, windowCoder); } -private final org.apache.beam.sdk.coders.Coder payloadCoder; +private final org.apache.beam.sdk.coders.Coder keyCoder; +private final org.apache.beam.sdk.coders.Coder> +windowsCoder; -private Coder(org.apache.beam.sdk.coders.Coder payloadCoder) { - this.payloadCoder = payloadCoder; +private Coder( +org.apache.beam.sdk.coders.Coder keyCoder, +org.apache.beam.sdk.coders.Coder windowCoder) { + this.keyCoder = keyCoder; + this.windowsCoder = (org.apache.beam.sdk.coders.Coder) CollectionCoder.of(windowCoder); + ; } @Override public void encode(Timer timer, OutputStream outStream) throws CoderException, IOException { - InstantCoder.of().encode(timer.getTimestamp(), outStream); - payloadCoder.encode(timer.getPayload(), outStream); + keyCoder.encode(timer.getUserKey(), outStream); + StringUtf8Coder.of().encode(timer.getDynamicTimerTag(), outStream); + BooleanCoder.of().encode(timer.getClearBit(), outStream); + if (!timer.getClearBit()) { +InstantCoder.of().encode(timer.getFireTimestamp(), outStream); +InstantCoder.of().encode(timer.getHoldTimestamp(), outStream); +windowsCoder.encode(timer.getWindows(), outStream); +PaneInfoCoder.INSTANCE.encode(timer.getPane(), outStream); + } } @Override public Timer decode(InputStream inStream) throws CoderException, IOException { - Instant instant = InstantCoder.of().decode(inStream); - T value = payloadCoder.decode(inStream); - return Timer.of(instant, value); + T userKey = keyCoder.decode(inStream); + String dynamicTimerTag = StringUtf8Coder.of().decode(inStream); + Boolean clearBit = BooleanCoder.of().decode(inStream); + if (clearBit) { +return Timer.of(userKey, dynamicTimerTag, clearBit); + } + Instant fireTimestamp = InstantCoder.of().decode(inStream); + Instant holeTimestamp = InstantCoder.of().decode(inStream); + Collection windows = windowsCoder.decode(inStream); + PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream); + return Timer.of( + userKey, dynamicTimerTag, clearBit, fireTimestamp, holeTimestamp, windows, pane); } @Override public List> getCoderArguments() { - return Collections.singletonList(payloadCoder); -} - -@Override -public void verifyDeterministic() throws NonDeterministicException { - verifyDeterministic(this, "Payload coder must be deterministic", payloadCoder); + return Collections.singletonList(keyCoder); } @Override -public boolean consistentWithEquals() { - return payloadCoder.consistentWithEquals(); +public List> getComponents() { + return Arrays.asList(keyCoder, windowsCoder); } @Override -public Object structuralValue(Timer value) { - return Timer.of(value.getTimestamp(), payloadCoder.structuralValue(value.getPayload())); -} - -@Override -public boolean isRegisterByteSizeObserverCheap(Timer value) { - return payloadCoder.isRegisterByteSizeObserverCheap(value.getPayload()); +public void verifyDeterministic() throws NonDeterministicException { + verifyDeterministic(this, "UserKey coder must be deterministic", keyCoder); + verifyDeterministic(this, "Windows coder must be deterministic", windowsCoder); Review comment: ```suggestion verifyDeterministic(this, "Window coder must be deterministic", windowsCoder); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411554&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411554 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 28/Mar/20 00:22 Start Date: 28/Mar/20 00:22 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r399588782 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java ## @@ -73,59 +107,81 @@ */ public static class Coder extends StructuredCoder> { Review comment: please update class comment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411554) Time Spent: 6h 10m (was: 6h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 6h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411551&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411551 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 28/Mar/20 00:22 Start Date: 28/Mar/20 00:22 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r399591496 ## File path: sdks/python/apache_beam/coders/coder_impl.py ## @@ -629,22 +629,56 @@ def estimate_size(self, unused_value, nested=False): class TimerCoderImpl(StreamCoderImpl): """For internal use only; no backwards-compatibility guarantees.""" - def __init__(self, payload_coder_impl): + def __init__(self, key_coder_impl, window_coder_impl, tag_coder_impl): self._timestamp_coder_impl = TimestampCoderImpl() -self._payload_coder_impl = payload_coder_impl +self._boolean_coder_impl = BooleanCoderImpl() +self._pane_info_coder_impl = PaneInfoCoderImpl() +self._key_coder_impl = key_coder_impl +self._windows_coder_impl = TupleSequenceCoderImpl(window_coder_impl) +self._tag_coder_impl = tag_coder_impl def encode_to_stream(self, value, out, nested): # type: (dict, create_OutputStream, bool) -> None -self._timestamp_coder_impl.encode_to_stream(value['timestamp'], out, True) -self._payload_coder_impl.encode_to_stream(value.get('payload'), out, True) +self._key_coder_impl.encode_to_stream(value.user_key, out, nested) Review comment: ```suggestion self._key_coder_impl.encode_to_stream(value.user_key, out, True) ``` We should ignore the nested field since it will change the encoding. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411551) Time Spent: 6h 10m (was: 6h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 6h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411549&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411549 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 28/Mar/20 00:22 Start Date: 28/Mar/20 00:22 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r399588665 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java ## @@ -57,11 +78,24 @@ * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this * timer. */ - public abstract Instant getTimestamp(); + @Nullable + public abstract T getUserKey(); + + public abstract String getDynamicTimerTag(); + + public abstract Boolean getClearBit(); Review comment: ```suggestion public abstract boolean getClearBit(); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411549) Time Spent: 6h (was: 5h 50m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 6h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411556&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411556 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 28/Mar/20 00:22 Start Date: 28/Mar/20 00:22 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r399589315 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java ## @@ -73,59 +107,81 @@ */ public static class Coder extends StructuredCoder> { -public static Coder of(org.apache.beam.sdk.coders.Coder payloadCoder) { - return new Coder(payloadCoder); +public static Coder of( +org.apache.beam.sdk.coders.Coder keyCoder, +org.apache.beam.sdk.coders.Coder windowCoder) { + return new Coder<>(keyCoder, windowCoder); } -private final org.apache.beam.sdk.coders.Coder payloadCoder; +private final org.apache.beam.sdk.coders.Coder keyCoder; +private final org.apache.beam.sdk.coders.Coder> +windowsCoder; -private Coder(org.apache.beam.sdk.coders.Coder payloadCoder) { - this.payloadCoder = payloadCoder; +private Coder( +org.apache.beam.sdk.coders.Coder keyCoder, +org.apache.beam.sdk.coders.Coder windowCoder) { + this.keyCoder = keyCoder; + this.windowsCoder = (org.apache.beam.sdk.coders.Coder) CollectionCoder.of(windowCoder); + ; } @Override public void encode(Timer timer, OutputStream outStream) throws CoderException, IOException { - InstantCoder.of().encode(timer.getTimestamp(), outStream); - payloadCoder.encode(timer.getPayload(), outStream); + keyCoder.encode(timer.getUserKey(), outStream); + StringUtf8Coder.of().encode(timer.getDynamicTimerTag(), outStream); + BooleanCoder.of().encode(timer.getClearBit(), outStream); + if (!timer.getClearBit()) { +InstantCoder.of().encode(timer.getFireTimestamp(), outStream); +InstantCoder.of().encode(timer.getHoldTimestamp(), outStream); +windowsCoder.encode(timer.getWindows(), outStream); +PaneInfoCoder.INSTANCE.encode(timer.getPane(), outStream); + } } @Override public Timer decode(InputStream inStream) throws CoderException, IOException { - Instant instant = InstantCoder.of().decode(inStream); - T value = payloadCoder.decode(inStream); - return Timer.of(instant, value); + T userKey = keyCoder.decode(inStream); + String dynamicTimerTag = StringUtf8Coder.of().decode(inStream); + Boolean clearBit = BooleanCoder.of().decode(inStream); + if (clearBit) { +return Timer.of(userKey, dynamicTimerTag, clearBit); + } + Instant fireTimestamp = InstantCoder.of().decode(inStream); + Instant holeTimestamp = InstantCoder.of().decode(inStream); Review comment: ```suggestion Instant holdTimestamp = InstantCoder.of().decode(inStream); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411556) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 6h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411553&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411553 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 28/Mar/20 00:22 Start Date: 28/Mar/20 00:22 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r399588611 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java ## @@ -40,14 +49,26 @@ @AutoValue public abstract class Timer { - /** Returns a timer for the given timestamp with a {@code null} payload. */ - public static Timer of(Instant time) { -return of(time, (Void) null); + /** + * Returns a timer for the given timestamp with a user specified payload. + * + * @return + */ + // TODO(BEAM-9562): Plumb through actual Timer fields. + public static Timer of( Review comment: May I suggest you create these two static methods for creating the two common cases? .of(userKey, dynamicTimerTag, fireTimestamp, holdTimestamp, windows, pane); .cleared(userKey, dynamicTImerTag); this way you can make sure that dynamicTimerTag, fireTimestamp, holdTimestamp, windows and pane are not null This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411553) Time Spent: 6h 10m (was: 6h) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 6h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements
[ https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411547&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411547 ] ASF GitHub Bot logged work on BEAM-9562: Author: ASF GitHub Bot Created on: 28/Mar/20 00:22 Start Date: 28/Mar/20 00:22 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11199: [BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers URL: https://github.com/apache/beam/pull/11199#discussion_r399587616 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ## @@ -197,7 +199,7 @@ public boolean canTranslate(PTransform pTransform) { ((KvCoder) mainInput.getCoder()).getKeyCoder(), // TODO: Add support for timer payloads to the SDK // We currently assume that all payloads are unspecified. Review comment: ```suggestion ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411547) Time Spent: 5h 50m (was: 5h 40m) > Remove timer from PCollection and treat timers as Elements > --- > > Key: BEAM-9562 > URL: https://issues.apache.org/jira/browse/BEAM-9562 > Project: Beam > Issue Type: New Feature > Components: sdk-py-harness >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 5h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8466) Python typehints: pep 484 warn and strict modes
[ https://issues.apache.org/jira/browse/BEAM-8466?focusedWorklogId=411524&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411524 ] ASF GitHub Bot logged work on BEAM-8466: Author: ASF GitHub Bot Created on: 28/Mar/20 00:00 Start Date: 28/Mar/20 00:00 Worklog Time Spent: 10m Work Description: udim commented on issue #11240: [BEAM-8466] Make strip_iterable more strict URL: https://github.com/apache/beam/pull/11240#issuecomment-605361748 I would like to test this internally before merging This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411524) Time Spent: 1h 40m (was: 1.5h) > Python typehints: pep 484 warn and strict modes > --- > > Key: BEAM-8466 > URL: https://issues.apache.org/jira/browse/BEAM-8466 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Udi Meiri >Assignee: Udi Meiri >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > Allow type checking to use PEP 484 type hints, but only warn if there are > errors, and in another mode to raise exceptions on errors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411523&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411523 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 27/Mar/20 23:57 Start Date: 27/Mar/20 23:57 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11252#issuecomment-605361234 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411523) Time Spent: 3h 20m (was: 3h 10m) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 3h 20m > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9626) pymongo should be an optional requirement
[ https://issues.apache.org/jira/browse/BEAM-9626?focusedWorklogId=411518&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411518 ] ASF GitHub Bot logged work on BEAM-9626: Author: ASF GitHub Bot Created on: 27/Mar/20 23:43 Start Date: 27/Mar/20 23:43 Worklog Time Spent: 10m Work Description: chadrik commented on issue #11256: [BEAM-9626] Make pymongo an optional requirement URL: https://github.com/apache/beam/pull/11256#issuecomment-605358697 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411518) Time Spent: 0.5h (was: 20m) > pymongo should be an optional requirement > - > > Key: BEAM-9626 > URL: https://issues.apache.org/jira/browse/BEAM-9626 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Chad Dombrova >Assignee: Chad Dombrova >Priority: Minor > Time Spent: 0.5h > Remaining Estimate: 0h > > The pymongo driver is installed by default, but as the number of IO > connectors in the python sdk grows, I don't think this is the precedent we > want to set. We already have "extra" packages for gcp, aws, and interactive, > we should also add one for mongo. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8292) Add a Reshuffle PTransform preventing fusion of the surrounding transforms
[ https://issues.apache.org/jira/browse/BEAM-8292?focusedWorklogId=411508&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411508 ] ASF GitHub Bot logged work on BEAM-8292: Author: ASF GitHub Bot Created on: 27/Mar/20 23:40 Start Date: 27/Mar/20 23:40 Worklog Time Spent: 10m Work Description: lostluck commented on pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK URL: https://github.com/apache/beam/pull/11197 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411508) Time Spent: 4h (was: 3h 50m) > Add a Reshuffle PTransform preventing fusion of the surrounding transforms > -- > > Key: BEAM-8292 > URL: https://issues.apache.org/jira/browse/BEAM-8292 > Project: Beam > Issue Type: New Feature > Components: sdk-go >Reporter: John Patoch >Assignee: Robert Burke >Priority: Minor > Time Spent: 4h > Remaining Estimate: 0h > > Reshuffle is a PTransform that takes a PCollection and shuffles the data > to help increase parallelism. > Reshuffle adds a temporary random key to each element, performs a > GroupByKey, and finally removes the temporary key. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9577) Update artifact staging and retrieval protocols to be dependency aware.
[ https://issues.apache.org/jira/browse/BEAM-9577?focusedWorklogId=411505&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411505 ] ASF GitHub Bot logged work on BEAM-9577: Author: ASF GitHub Bot Created on: 27/Mar/20 23:35 Start Date: 27/Mar/20 23:35 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11203: [BEAM-9577] Define and implement dependency-aware artifact staging service. URL: https://github.com/apache/beam/pull/11203#discussion_r399583281 ## File path: model/job-management/src/main/proto/beam_artifact_api.proto ## @@ -31,8 +31,77 @@ option java_outer_classname = "ArtifactApi"; import "beam_runner_api.proto"; -// A service to stage artifacts for use in a Job. +// A service to retrieve artifacts for use in a Job. +service ArtifactRetrievalService { + // Resolves the given artifact reference into one or more simpler artifact + // references (e.g. a Maven dependency into a (transitive) set of jars. + // If no further simplification is possible, returns the original artifacts, Review comment: Clarified in the docs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411505) Time Spent: 3h (was: 2h 50m) > Update artifact staging and retrieval protocols to be dependency aware. > --- > > Key: BEAM-9577 > URL: https://issues.apache.org/jira/browse/BEAM-9577 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Robert Bradshaw >Assignee: Robert Bradshaw >Priority: Major > Time Spent: 3h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9577) Update artifact staging and retrieval protocols to be dependency aware.
[ https://issues.apache.org/jira/browse/BEAM-9577?focusedWorklogId=411503&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411503 ] ASF GitHub Bot logged work on BEAM-9577: Author: ASF GitHub Bot Created on: 27/Mar/20 23:35 Start Date: 27/Mar/20 23:35 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11203: [BEAM-9577] Define and implement dependency-aware artifact staging service. URL: https://github.com/apache/beam/pull/11203#discussion_r399581813 ## File path: model/job-management/src/main/proto/beam_artifact_api.proto ## @@ -31,8 +31,77 @@ option java_outer_classname = "ArtifactApi"; import "beam_runner_api.proto"; -// A service to stage artifacts for use in a Job. +// A service to retrieve artifacts for use in a Job. +service ArtifactRetrievalService { + // Resolves the given artifact reference into one or more simpler artifact + // references (e.g. a Maven dependency into a (transitive) set of jars. + // If no further simplification is possible, returns the original artifacts, + // at which point, all artifacts must be gettable. + rpc ResolveArtifact(ResolveArtifactRequest) returns (ResolveArtifactResponse); + + // Retrieves the given artifact as a stream of bytes. + rpc GetArtifact(GetArtifactRequest) returns (stream GetArtifactResponse); +} + +// A service that allows the client to act as an ArtifactRetrievalService, +// for a particular job with the server initiating requests and receiving +// responses. +// A client calls the service with an ArtifactResponseWrapper that has the +// staging token set, and thereafter responds to the server's requests. service ArtifactStagingService { + rpc ReverseArtifactRetrievalService(stream ArtifactResponseWrapper) Review comment: Good question. As discussed offline, multiple requests can be pushed before waiting for a response, so we should be able to fill the network buffer. Either the server or client can choose to parallelize on its backend store as needed. I added an error field to the resolve_artifacts field as a runner may attempt to opportunistically consolidate environments, and then fall back to doing each separately. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411503) Time Spent: 2h 50m (was: 2h 40m) > Update artifact staging and retrieval protocols to be dependency aware. > --- > > Key: BEAM-9577 > URL: https://issues.apache.org/jira/browse/BEAM-9577 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Robert Bradshaw >Assignee: Robert Bradshaw >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9577) Update artifact staging and retrieval protocols to be dependency aware.
[ https://issues.apache.org/jira/browse/BEAM-9577?focusedWorklogId=411504&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411504 ] ASF GitHub Bot logged work on BEAM-9577: Author: ASF GitHub Bot Created on: 27/Mar/20 23:35 Start Date: 27/Mar/20 23:35 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11203: [BEAM-9577] Define and implement dependency-aware artifact staging service. URL: https://github.com/apache/beam/pull/11203#discussion_r399582853 ## File path: model/job-management/src/main/proto/beam_artifact_api.proto ## @@ -31,8 +31,77 @@ option java_outer_classname = "ArtifactApi"; import "beam_runner_api.proto"; -// A service to stage artifacts for use in a Job. +// A service to retrieve artifacts for use in a Job. +service ArtifactRetrievalService { + // Resolves the given artifact reference into one or more simpler artifact + // references (e.g. a Maven dependency into a (transitive) set of jars. + // If no further simplification is possible, returns the original artifacts, + // at which point, all artifacts must be gettable. + rpc ResolveArtifact(ResolveArtifactRequest) returns (ResolveArtifactResponse); + + // Retrieves the given artifact as a stream of bytes. + rpc GetArtifact(GetArtifactRequest) returns (stream GetArtifactResponse); +} + +// A service that allows the client to act as an ArtifactRetrievalService, +// for a particular job with the server initiating requests and receiving +// responses. +// A client calls the service with an ArtifactResponseWrapper that has the +// staging token set, and thereafter responds to the server's requests. service ArtifactStagingService { + rpc ReverseArtifactRetrievalService(stream ArtifactResponseWrapper) + returns (stream ArtifactRequestWrapper); +} + +// A request for artifact resolution. +message ResolveArtifactRequest { + repeated org.apache.beam.model.pipeline.v1.ArtifactInformation artifacts = 1; Review comment: The documentation has been expanded. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411504) Time Spent: 2h 50m (was: 2h 40m) > Update artifact staging and retrieval protocols to be dependency aware. > --- > > Key: BEAM-9577 > URL: https://issues.apache.org/jira/browse/BEAM-9577 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Robert Bradshaw >Assignee: Robert Bradshaw >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9626) pymongo should be an optional requirement
[ https://issues.apache.org/jira/browse/BEAM-9626?focusedWorklogId=411493&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411493 ] ASF GitHub Bot logged work on BEAM-9626: Author: ASF GitHub Bot Created on: 27/Mar/20 23:28 Start Date: 27/Mar/20 23:28 Worklog Time Spent: 10m Work Description: chadrik commented on issue #11256: [BEAM-9626] Make pymongo an optional requirement URL: https://github.com/apache/beam/pull/11256#issuecomment-605355110 R: @aaltay R: @kamilwu R: @y1chi This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411493) Time Spent: 20m (was: 10m) > pymongo should be an optional requirement > - > > Key: BEAM-9626 > URL: https://issues.apache.org/jira/browse/BEAM-9626 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Chad Dombrova >Assignee: Chad Dombrova >Priority: Minor > Time Spent: 20m > Remaining Estimate: 0h > > The pymongo driver is installed by default, but as the number of IO > connectors in the python sdk grows, I don't think this is the precedent we > want to set. We already have "extra" packages for gcp, aws, and interactive, > we should also add one for mongo. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9626) pymongo should be an optional requirement
[ https://issues.apache.org/jira/browse/BEAM-9626?focusedWorklogId=411492&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411492 ] ASF GitHub Bot logged work on BEAM-9626: Author: ASF GitHub Bot Created on: 27/Mar/20 23:26 Start Date: 27/Mar/20 23:26 Worklog Time Spent: 10m Work Description: chadrik commented on pull request #11256: [BEAM-9626] Make pymongo an optional requirement URL: https://github.com/apache/beam/pull/11256 The pymongo driver is installed by default, but as the number of IO connectors in the python sdk grows, I don't think this is the precedent we want to set. We already have "extra" packages for gcp, aws, and interactive, we should also add one for mongo. After this PR is merged, the mongo deps can be installed using `pip install apache_beam[mongo]` Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/
[jira] [Created] (BEAM-9626) pymongo should be an optional requirement
Chad Dombrova created BEAM-9626: --- Summary: pymongo should be an optional requirement Key: BEAM-9626 URL: https://issues.apache.org/jira/browse/BEAM-9626 Project: Beam Issue Type: Improvement Components: sdk-py-core Reporter: Chad Dombrova Assignee: Chad Dombrova The pymongo driver is installed by default, but as the number of IO connectors in the python sdk grows, I don't think this is the precedent we want to set. We already have "extra" packages for gcp, aws, and interactive, we should also add one for mongo. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9577) Update artifact staging and retrieval protocols to be dependency aware.
[ https://issues.apache.org/jira/browse/BEAM-9577?focusedWorklogId=411491&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411491 ] ASF GitHub Bot logged work on BEAM-9577: Author: ASF GitHub Bot Created on: 27/Mar/20 23:24 Start Date: 27/Mar/20 23:24 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11203: [BEAM-9577] Define and implement dependency-aware artifact staging service. URL: https://github.com/apache/beam/pull/11203#discussion_r399580804 ## File path: model/job-management/src/main/proto/beam_artifact_api.proto ## @@ -31,8 +31,77 @@ option java_outer_classname = "ArtifactApi"; import "beam_runner_api.proto"; -// A service to stage artifacts for use in a Job. +// A service to retrieve artifacts for use in a Job. +service ArtifactRetrievalService { + // Resolves the given artifact reference into one or more simpler artifact + // references (e.g. a Maven dependency into a (transitive) set of jars. + // If no further simplification is possible, returns the original artifacts, + // at which point, all artifacts must be gettable. + rpc ResolveArtifact(ResolveArtifactRequest) returns (ResolveArtifactResponse); + + // Retrieves the given artifact as a stream of bytes. + rpc GetArtifact(GetArtifactRequest) returns (stream GetArtifactResponse); +} + +// A service that allows the client to act as an ArtifactRetrievalService, +// for a particular job with the server initiating requests and receiving +// responses. +// A client calls the service with an ArtifactResponseWrapper that has the +// staging token set, and thereafter responds to the server's requests. service ArtifactStagingService { + rpc ReverseArtifactRetrievalService(stream ArtifactResponseWrapper) + returns (stream ArtifactRequestWrapper); +} + +// A request for artifact resolution. +message ResolveArtifactRequest { + repeated org.apache.beam.model.pipeline.v1.ArtifactInformation artifacts = 1; Review comment: Yes, updated the documentation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411491) Time Spent: 2h 40m (was: 2.5h) > Update artifact staging and retrieval protocols to be dependency aware. > --- > > Key: BEAM-9577 > URL: https://issues.apache.org/jira/browse/BEAM-9577 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Robert Bradshaw >Assignee: Robert Bradshaw >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9331) The Row object needs better builders
[ https://issues.apache.org/jira/browse/BEAM-9331?focusedWorklogId=411482&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411482 ] ASF GitHub Bot logged work on BEAM-9331: Author: ASF GitHub Bot Created on: 27/Mar/20 23:09 Start Date: 27/Mar/20 23:09 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10883: [BEAM-9331] Add better Row builders URL: https://github.com/apache/beam/pull/10883 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411482) Time Spent: 5h 10m (was: 5h) > The Row object needs better builders > > > Key: BEAM-9331 > URL: https://issues.apache.org/jira/browse/BEAM-9331 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core >Reporter: Reuven Lax >Assignee: Reuven Lax >Priority: Major > Time Spent: 5h 10m > Remaining Estimate: 0h > > Users should be able to build a Row object by specifying field names. Desired > syntax: > > Row.withSchema(schema) > .withFieldName("field1", "value) > .withFieldName("field2.field3", value) > .build() > > Users should also have a builder that allows taking an existing row and > changing specific fields. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411479&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411479 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 27/Mar/20 23:07 Start Date: 27/Mar/20 23:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11185: [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs URL: https://github.com/apache/beam/pull/11185#discussion_r399573259 ## File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ## @@ -597,19 +611,47 @@ def _get_side_input_encoding(self, input_encoding): }, } - def _get_encoded_output_coder(self, transform_node, window_value=True): -"""Returns the cloud encoding of the coder for the output of a transform.""" -from apache_beam.runners.portability.fn_api_runner.translations import \ - only_element -if len(transform_node.outputs) == 1: - output_tag = only_element(transform_node.outputs.keys()) - # TODO(robertwb): Handle type hints for multi-output transforms. + def _get_encoded_output_coder( + self, transform_node, window_value=True, output_tag=None): +"""Returns the cloud encoding of the coder for the output of a transform. + +If output_tag is not specified, we assume all outputs to have the same +encoding. +""" +from apache_beam.transforms.core import RunnerAPIPTransformHolder +external_transform = isinstance( +transform_node.transform, RunnerAPIPTransformHolder) +if external_transform and not output_tag: + raise Exception('For external transforms, output_tag must be specified') + +if not output_tag: + output_tag = next(iter(transform_node.outputs.keys())) + +element_type = None +if external_transform: element_type = transform_node.outputs[output_tag].element_type + # We perform special encoding for any external coders that cannot be + # parsed in Python SDK. + if isinstance(element_type, ElementTypeHolder): +coder = self._get_encoding_for_external_environment(element_type) +if window_value: Review comment: Don't duplicate this logic, instead augment `_get_typehint_based_encoding` (well, really `coders.registry.get_coder`) to do the natural thing for ElementTypeHolders. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411479) Time Spent: 12.5h (was: 12h 20m) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: Major > Time Spent: 12.5h > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411462&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411462 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 27/Mar/20 23:07 Start Date: 27/Mar/20 23:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11185: [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs URL: https://github.com/apache/beam/pull/11185#discussion_r399561796 ## File path: sdks/python/apache_beam/coders/coders.py ## @@ -370,7 +370,8 @@ def from_runner_api(cls, coder_proto, context): except Exception: if context.allow_proto_holders: # ignore this typing scenario for now, since it can't be easily tracked -return RunnerAPICoderHolder(coder_proto) # type: ignore +return ExternalCoder( Review comment: I'd rather not do this for arbitrary exceptions, instead let's do this iff `coder_proto.spec.urn` is not in `cls._known_urns`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411462) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: Major > Time Spent: 11h 20m > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411461&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411461 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 27/Mar/20 23:07 Start Date: 27/Mar/20 23:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11185: [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs URL: https://github.com/apache/beam/pull/11185#discussion_r399564536 ## File path: sdks/python/apache_beam/pipeline.py ## @@ -1128,29 +1136,67 @@ def from_runner_api(proto, # type: beam_runner_api_pb2.PTransform context # type: PipelineContext ): # type: (...) -> AppliedPTransform -def is_side_input(tag): +def is_python_side_input(tag): # type: (str) -> bool # As per named_inputs() above. - return tag.startswith('side') + return re.match(SIDE_INPUT_REGEX, tag) Review comment: This'll break if a java user names their side inputs coincidentally something that matches. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411461) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: Major > Time Spent: 11h 20m > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411469&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411469 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 27/Mar/20 23:07 Start Date: 27/Mar/20 23:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11185: [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs URL: https://github.com/apache/beam/pull/11185#discussion_r399566029 ## File path: sdks/python/apache_beam/pipeline.py ## @@ -1128,29 +1136,67 @@ def from_runner_api(proto, # type: beam_runner_api_pb2.PTransform context # type: PipelineContext ): # type: (...) -> AppliedPTransform -def is_side_input(tag): +def is_python_side_input(tag): # type: (str) -> bool # As per named_inputs() above. - return tag.startswith('side') + return re.match(SIDE_INPUT_REGEX, tag) + +side_input_tags = [] +if common_urns.primitives.PAR_DO.urn == proto.spec.urn: + # Preserving side input tags. + from apache_beam.utils import proto_utils + from apache_beam.portability.api import beam_runner_api_pb2 + payload = ( + proto_utils.parse_Bytes( + proto.spec.payload, beam_runner_api_pb2.ParDoPayload)) + for tag, si in payload.side_inputs.items(): Review comment: List comprehension? Or just ``` if ...: ... side_input_tags = list(payload.side_inputs.keys()) else: side_input_tags = [] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411469) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: Major > Time Spent: 11h 50m > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411460&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411460 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 27/Mar/20 23:07 Start Date: 27/Mar/20 23:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11185: [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs URL: https://github.com/apache/beam/pull/11185#discussion_r399563372 ## File path: sdks/python/apache_beam/coders/coders.py ## @@ -1383,22 +1375,74 @@ def from_runner_api_parameter(payload, components, context): write_state_threshold=int(payload)) -class RunnerAPICoderHolder(Coder): +class ElementTypeHolder(typehints.TypeConstraint): + """A dummy element type for external coders that cannot be parsed in Python""" + def __init__(self, coder, context): +self.coder = coder +self.context = context + + +class ExternalCoder(Coder): """A `Coder` that holds a runner API `Coder` proto. This is used for coders for which corresponding objects cannot be initialized in Python SDK. For example, coders for remote SDKs that may be available in Python SDK transform graph when expanding a cross-language transform. """ - def __init__(self, proto): -self._proto = proto - def proto(self): -return self._proto + coder_count = 0 - def to_runner_api(self, context): -return self._proto + def __init__(self, element_type_holder): +self.element_type_holder = element_type_holder - def to_type_hint(self): -return Any + def as_cloud_object(self, coders_context=None): +if not coders_context: + raise Exception( + 'coders_context must be specified to correctly encode external coders' + ) +coder_id = coders_context.get_by_proto( +self.element_type_holder.coder, deduplicate=True) + +coder_proto = self.element_type_holder.coder + +kind_str = 'kind:external' + str(ExternalCoder.coder_count) +ExternalCoder.coder_count = ExternalCoder.coder_count + 1 +component_encodings = [] +if coder_proto.spec.urn == 'beam:coder:kv:v1': Review comment: I anticipate `kind:stream` will be needed to handle GBK of unknown types. Others may be needed for other cases, or in the future, and it seems risky to enumerate them here and in the dataflow runner. There may also be cases where we have to go more than one level deep. We should try to return the same thing the external SDK would have returned just to be safe, and that means wrapping only the leaves as external coders. I think that'll clean stuff up as well (e.g. no need for `_coerce_to_kv_type_from_external_type`). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411460) Time Spent: 11h 20m (was: 11h 10m) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: Major > Time Spent: 11h 20m > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411467&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411467 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 27/Mar/20 23:07 Start Date: 27/Mar/20 23:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11185: [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs URL: https://github.com/apache/beam/pull/11185#discussion_r399569155 ## File path: sdks/python/apache_beam/pipeline.py ## @@ -1128,29 +1136,67 @@ def from_runner_api(proto, # type: beam_runner_api_pb2.PTransform context # type: PipelineContext ): # type: (...) -> AppliedPTransform -def is_side_input(tag): +def is_python_side_input(tag): # type: (str) -> bool # As per named_inputs() above. - return tag.startswith('side') + return re.match(SIDE_INPUT_REGEX, tag) + +side_input_tags = [] +if common_urns.primitives.PAR_DO.urn == proto.spec.urn: + # Preserving side input tags. + from apache_beam.utils import proto_utils + from apache_beam.portability.api import beam_runner_api_pb2 + payload = ( + proto_utils.parse_Bytes( + proto.spec.payload, beam_runner_api_pb2.ParDoPayload)) + for tag, si in payload.side_inputs.items(): +side_input_tags.append(tag) main_inputs = [ context.pcollections.get_by_id(id) for tag, -id in proto.inputs.items() if not is_side_input(tag) +id in proto.inputs.items() if tag not in side_input_tags ] -# Ordering is important here. -indexed_side_inputs = [ -(get_sideinput_index(tag), context.pcollections.get_by_id(id)) for tag, -id in proto.inputs.items() if is_side_input(tag) -] +# Using a list here so that we can pass this into a function +# TODO: use nonlocal after fully migrated to Python3. +next_index = [0] + +def _get_sideinput_index(tag, next_index): + if is_python_side_input(tag): +return get_sideinput_index(tag) + else: +index = next_index[0] +next_index[0] = next_index[0] + 1 +return index + +# Ordering is important here for Python sideinputs. +indexed_side_inputs = [( +_get_sideinput_index(tag, next_index), +context.pcollections.get_by_id(id)) for tag, + id in proto.inputs.items() if tag in side_input_tags] side_inputs = [si for _, si in sorted(indexed_side_inputs)] + +input_tags_to_preserve = {} + transform = ptransform.PTransform.from_runner_api(proto, context) +if isinstance(transform, RunnerAPIPTransformHolder): + # For external transforms that are ParDos, we have to set side-inputs + # manually and preserve input tags. + transform.side_inputs = [pvalue.AsMultiMap(pc) for pc in side_inputs] Review comment: Is AsMultiMap always the right thing to use? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411467) Time Spent: 11h 50m (was: 11h 40m) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: Major > Time Spent: 11h 50m > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411464&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411464 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 27/Mar/20 23:07 Start Date: 27/Mar/20 23:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11185: [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs URL: https://github.com/apache/beam/pull/11185#discussion_r399565439 ## File path: sdks/python/apache_beam/pipeline.py ## @@ -1128,29 +1136,67 @@ def from_runner_api(proto, # type: beam_runner_api_pb2.PTransform context # type: PipelineContext ): # type: (...) -> AppliedPTransform -def is_side_input(tag): +def is_python_side_input(tag): # type: (str) -> bool # As per named_inputs() above. - return tag.startswith('side') + return re.match(SIDE_INPUT_REGEX, tag) + +side_input_tags = [] +if common_urns.primitives.PAR_DO.urn == proto.spec.urn: + # Preserving side input tags. + from apache_beam.utils import proto_utils Review comment: Import at the top unless there's a reason (e.g. circular imports) that prevents it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411464) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: Major > Time Spent: 11.5h > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411463&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411463 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 27/Mar/20 23:07 Start Date: 27/Mar/20 23:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11185: [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs URL: https://github.com/apache/beam/pull/11185#discussion_r399564877 ## File path: sdks/python/apache_beam/pipeline.py ## @@ -1110,7 +1117,8 @@ def transform_to_runner_api(transform, # type: Optional[ptransform.PTransform] for part in self.parts ], inputs={ -tag: context.pcollections.get_id(pc) +_may_be_preserve_tag(tag, pc, self.input_tags_to_preserve): +context.pcollections.get_id(pc) for tag, pc in sorted(self.named_inputs().items()) Review comment: While we're here, did yapf not like this on the previous line? Maybe write `(tag, pc)` (unless it tries to split that too). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411463) Time Spent: 11.5h (was: 11h 20m) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: Major > Time Spent: 11.5h > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411466&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411466 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 27/Mar/20 23:07 Start Date: 27/Mar/20 23:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11185: [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs URL: https://github.com/apache/beam/pull/11185#discussion_r399563919 ## File path: sdks/python/apache_beam/pipeline.py ## @@ -1102,6 +1105,10 @@ def transform_to_runner_api(transform, # type: Optional[ptransform.PTransform] (transform_urn in Pipeline.sdk_transforms_with_environment())): environment_id = context.default_environment_id() +def _may_be_preserve_tag(new_tag, pc, input_tags_to_preserve): Review comment: `maybe` is one word. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411466) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: Major > Time Spent: 11h 40m > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411470&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411470 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 27/Mar/20 23:07 Start Date: 27/Mar/20 23:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11185: [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs URL: https://github.com/apache/beam/pull/11185#discussion_r399571295 ## File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ## @@ -597,19 +611,47 @@ def _get_side_input_encoding(self, input_encoding): }, } - def _get_encoded_output_coder(self, transform_node, window_value=True): -"""Returns the cloud encoding of the coder for the output of a transform.""" -from apache_beam.runners.portability.fn_api_runner.translations import \ - only_element -if len(transform_node.outputs) == 1: - output_tag = only_element(transform_node.outputs.keys()) - # TODO(robertwb): Handle type hints for multi-output transforms. + def _get_encoded_output_coder( + self, transform_node, window_value=True, output_tag=None): +"""Returns the cloud encoding of the coder for the output of a transform. + +If output_tag is not specified, we assume all outputs to have the same +encoding. +""" +from apache_beam.transforms.core import RunnerAPIPTransformHolder +external_transform = isinstance( +transform_node.transform, RunnerAPIPTransformHolder) +if external_transform and not output_tag: + raise Exception('For external transforms, output_tag must be specified') + +if not output_tag: + output_tag = next(iter(transform_node.outputs.keys())) Review comment: This is only legal if there is only one output, right? Given the logic, it seems it's never actually used. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411470) Time Spent: 12h (was: 11h 50m) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: Major > Time Spent: 12h > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411472&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411472 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 27/Mar/20 23:07 Start Date: 27/Mar/20 23:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11185: [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs URL: https://github.com/apache/beam/pull/11185#discussion_r399571052 ## File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ## @@ -597,19 +611,47 @@ def _get_side_input_encoding(self, input_encoding): }, } - def _get_encoded_output_coder(self, transform_node, window_value=True): -"""Returns the cloud encoding of the coder for the output of a transform.""" -from apache_beam.runners.portability.fn_api_runner.translations import \ - only_element -if len(transform_node.outputs) == 1: - output_tag = only_element(transform_node.outputs.keys()) - # TODO(robertwb): Handle type hints for multi-output transforms. + def _get_encoded_output_coder( + self, transform_node, window_value=True, output_tag=None): +"""Returns the cloud encoding of the coder for the output of a transform. + +If output_tag is not specified, we assume all outputs to have the same +encoding. +""" +from apache_beam.transforms.core import RunnerAPIPTransformHolder Review comment: Ditto on import. (And any below.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411472) Time Spent: 12h 10m (was: 12h) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: Major > Time Spent: 12h 10m > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411473&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411473 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 27/Mar/20 23:07 Start Date: 27/Mar/20 23:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11185: [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs URL: https://github.com/apache/beam/pull/11185#discussion_r399574133 ## File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ## @@ -925,8 +970,11 @@ def run_ParDo(self, transform_node, options): (transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn or use_unified_worker)): # Patch side input ids to be unique across a given pipeline. - if (label_renames and - transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn): + # This should not be done for external transforms since external SDKs may Review comment: It seems cleaner to simply not populate the label_renames map in this case rather than add a branch here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411473) Time Spent: 12h 20m (was: 12h 10m) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: Major > Time Spent: 12h 20m > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411471&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411471 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 27/Mar/20 23:07 Start Date: 27/Mar/20 23:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11185: [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs URL: https://github.com/apache/beam/pull/11185#discussion_r399570138 ## File path: sdks/python/apache_beam/pipeline.py ## @@ -1128,29 +1136,67 @@ def from_runner_api(proto, # type: beam_runner_api_pb2.PTransform context # type: PipelineContext ): # type: (...) -> AppliedPTransform -def is_side_input(tag): +def is_python_side_input(tag): # type: (str) -> bool # As per named_inputs() above. - return tag.startswith('side') + return re.match(SIDE_INPUT_REGEX, tag) + +side_input_tags = [] +if common_urns.primitives.PAR_DO.urn == proto.spec.urn: + # Preserving side input tags. + from apache_beam.utils import proto_utils + from apache_beam.portability.api import beam_runner_api_pb2 + payload = ( + proto_utils.parse_Bytes( + proto.spec.payload, beam_runner_api_pb2.ParDoPayload)) + for tag, si in payload.side_inputs.items(): +side_input_tags.append(tag) main_inputs = [ context.pcollections.get_by_id(id) for tag, -id in proto.inputs.items() if not is_side_input(tag) +id in proto.inputs.items() if tag not in side_input_tags ] -# Ordering is important here. -indexed_side_inputs = [ -(get_sideinput_index(tag), context.pcollections.get_by_id(id)) for tag, -id in proto.inputs.items() if is_side_input(tag) -] +# Using a list here so that we can pass this into a function +# TODO: use nonlocal after fully migrated to Python3. +next_index = [0] + +def _get_sideinput_index(tag, next_index): Review comment: It feels like this could result in duplicate indices if side one has tags named ['tag', 'side0', ...]. But maybe in that case it's OK? Please reference (create?) a JIRA about making side inputs always key-valued rather than having this kind of logic (here and elsewhere). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411471) Time Spent: 12h (was: 11h 50m) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: Major > Time Spent: 12h > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411477&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411477 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 27/Mar/20 23:07 Start Date: 27/Mar/20 23:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11185: [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs URL: https://github.com/apache/beam/pull/11185#discussion_r399575610 ## File path: sdks/python/apache_beam/runners/pipeline_context.py ## @@ -115,9 +115,12 @@ def get_id_to_proto_map(self): # type: () -> Dict[str, message.Message] return self._id_to_proto - def put_proto(self, id, proto): + def get_proto_from_id(self, id): +return self.get_id_to_proto_map()[id] + + def put_proto(self, id, proto, ignore_duplicates=False): # type: (str, message.Message) -> str -if id in self._id_to_proto: +if not ignore_duplicates and id in self._id_to_proto: Review comment: If ignore_duplicates=True, should we ensure that the protos are the same? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411477) Time Spent: 12.5h (was: 12h 20m) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: Major > Time Spent: 12.5h > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411468&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411468 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 27/Mar/20 23:07 Start Date: 27/Mar/20 23:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11185: [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs URL: https://github.com/apache/beam/pull/11185#discussion_r399570500 ## File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ## @@ -558,6 +564,14 @@ def run_pipeline(self, pipeline, options): result.metric_results = self._metrics return result + def _get_encoding_for_external_environment(self, element): +if not isinstance(element, ElementTypeHolder): + raise Exception( + 'Expected to receive an object of type ElementTypeHolder ' + 'but received %r' % element) +from apache_beam.coders.coders import ExternalCoder Review comment: Can `apache_beam.coders.coders` not be imported at the top? In which case, it seems this whole method can be a one-liner at the call site. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411468) Time Spent: 11h 50m (was: 11h 40m) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: Major > Time Spent: 11h 50m > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411478&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411478 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 27/Mar/20 23:07 Start Date: 27/Mar/20 23:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11185: [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs URL: https://github.com/apache/beam/pull/11185#discussion_r399574958 ## File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ## @@ -995,10 +1044,13 @@ def run_ParDo(self, transform_node, options): # The assumption here is that all outputs will have the same typehint # and coder as the main output. This is certainly the case right now # but conceivably it could change in the future. + encoding = self._get_encoded_output_coder( + transform_node, + output_tag=side_tag) if external_transform else step.encoding Review comment: Why do we need to branch on external_transform her? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411478) Time Spent: 12.5h (was: 12h 20m) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: Major > Time Spent: 12.5h > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411465&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411465 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 27/Mar/20 23:07 Start Date: 27/Mar/20 23:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11185: [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs URL: https://github.com/apache/beam/pull/11185#discussion_r399568963 ## File path: sdks/python/apache_beam/pipeline.py ## @@ -1128,29 +1136,67 @@ def from_runner_api(proto, # type: beam_runner_api_pb2.PTransform context # type: PipelineContext ): # type: (...) -> AppliedPTransform -def is_side_input(tag): +def is_python_side_input(tag): # type: (str) -> bool # As per named_inputs() above. - return tag.startswith('side') + return re.match(SIDE_INPUT_REGEX, tag) + +side_input_tags = [] +if common_urns.primitives.PAR_DO.urn == proto.spec.urn: + # Preserving side input tags. + from apache_beam.utils import proto_utils + from apache_beam.portability.api import beam_runner_api_pb2 + payload = ( + proto_utils.parse_Bytes( + proto.spec.payload, beam_runner_api_pb2.ParDoPayload)) + for tag, si in payload.side_inputs.items(): +side_input_tags.append(tag) main_inputs = [ context.pcollections.get_by_id(id) for tag, -id in proto.inputs.items() if not is_side_input(tag) +id in proto.inputs.items() if tag not in side_input_tags ] -# Ordering is important here. -indexed_side_inputs = [ -(get_sideinput_index(tag), context.pcollections.get_by_id(id)) for tag, -id in proto.inputs.items() if is_side_input(tag) -] +# Using a list here so that we can pass this into a function +# TODO: use nonlocal after fully migrated to Python3. +next_index = [0] + +def _get_sideinput_index(tag, next_index): + if is_python_side_input(tag): +return get_sideinput_index(tag) + else: +index = next_index[0] +next_index[0] = next_index[0] + 1 +return index + +# Ordering is important here for Python sideinputs. +indexed_side_inputs = [( +_get_sideinput_index(tag, next_index), +context.pcollections.get_by_id(id)) for tag, + id in proto.inputs.items() if tag in side_input_tags] side_inputs = [si for _, si in sorted(indexed_side_inputs)] + +input_tags_to_preserve = {} Review comment: https://engdoc.corp.google.com/eng/doc/devguide/py/totw/026.md?cl=head This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411465) Time Spent: 11h 40m (was: 11.5h) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: Major > Time Spent: 11h 40m > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411474&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411474 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 27/Mar/20 23:07 Start Date: 27/Mar/20 23:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11185: [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs URL: https://github.com/apache/beam/pull/11185#discussion_r399569834 ## File path: sdks/python/apache_beam/pipeline.py ## @@ -1128,29 +1136,67 @@ def from_runner_api(proto, # type: beam_runner_api_pb2.PTransform context # type: PipelineContext ): # type: (...) -> AppliedPTransform -def is_side_input(tag): +def is_python_side_input(tag): # type: (str) -> bool # As per named_inputs() above. - return tag.startswith('side') + return re.match(SIDE_INPUT_REGEX, tag) + +side_input_tags = [] +if common_urns.primitives.PAR_DO.urn == proto.spec.urn: + # Preserving side input tags. + from apache_beam.utils import proto_utils + from apache_beam.portability.api import beam_runner_api_pb2 + payload = ( + proto_utils.parse_Bytes( + proto.spec.payload, beam_runner_api_pb2.ParDoPayload)) + for tag, si in payload.side_inputs.items(): +side_input_tags.append(tag) main_inputs = [ context.pcollections.get_by_id(id) for tag, -id in proto.inputs.items() if not is_side_input(tag) +id in proto.inputs.items() if tag not in side_input_tags ] -# Ordering is important here. -indexed_side_inputs = [ -(get_sideinput_index(tag), context.pcollections.get_by_id(id)) for tag, -id in proto.inputs.items() if is_side_input(tag) -] +# Using a list here so that we can pass this into a function +# TODO: use nonlocal after fully migrated to Python3. +next_index = [0] + +def _get_sideinput_index(tag, next_index): + if is_python_side_input(tag): +return get_sideinput_index(tag) + else: +index = next_index[0] +next_index[0] = next_index[0] + 1 +return index + +# Ordering is important here for Python sideinputs. +indexed_side_inputs = [( +_get_sideinput_index(tag, next_index), +context.pcollections.get_by_id(id)) for tag, + id in proto.inputs.items() if tag in side_input_tags] side_inputs = [si for _, si in sorted(indexed_side_inputs)] + +input_tags_to_preserve = {} + transform = ptransform.PTransform.from_runner_api(proto, context) +if isinstance(transform, RunnerAPIPTransformHolder): Review comment: Is the sorting by python side input index needed at all in this path? If not, perhaps it should be in an else clause (where get_sideinput_index could be used unconditionally)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411474) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: Major > Time Spent: 12h 20m > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411476&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411476 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 27/Mar/20 23:07 Start Date: 27/Mar/20 23:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11185: [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs URL: https://github.com/apache/beam/pull/11185#discussion_r399572771 ## File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ## @@ -597,19 +611,47 @@ def _get_side_input_encoding(self, input_encoding): }, } - def _get_encoded_output_coder(self, transform_node, window_value=True): -"""Returns the cloud encoding of the coder for the output of a transform.""" -from apache_beam.runners.portability.fn_api_runner.translations import \ - only_element -if len(transform_node.outputs) == 1: - output_tag = only_element(transform_node.outputs.keys()) - # TODO(robertwb): Handle type hints for multi-output transforms. + def _get_encoded_output_coder( + self, transform_node, window_value=True, output_tag=None): +"""Returns the cloud encoding of the coder for the output of a transform. + +If output_tag is not specified, we assume all outputs to have the same +encoding. +""" +from apache_beam.transforms.core import RunnerAPIPTransformHolder +external_transform = isinstance( +transform_node.transform, RunnerAPIPTransformHolder) +if external_transform and not output_tag: + raise Exception('For external transforms, output_tag must be specified') + +if not output_tag: + output_tag = next(iter(transform_node.outputs.keys())) + +element_type = None +if external_transform: Review comment: Seems the logic should be ``` if output_tag: element_type = transform_node.outputs[output_tag].element_type elif len(transform_node.outputs) == 1: element_type = only_element(transform_node.outputs).element_type elif external: raise "Tag required for external..." else: element_type = typehints.Any ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411476) Time Spent: 12.5h (was: 12h 20m) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: Major > Time Spent: 12.5h > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411480&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411480 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 27/Mar/20 23:07 Start Date: 27/Mar/20 23:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11185: [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs URL: https://github.com/apache/beam/pull/11185#discussion_r399571350 ## File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ## @@ -597,19 +611,47 @@ def _get_side_input_encoding(self, input_encoding): }, } - def _get_encoded_output_coder(self, transform_node, window_value=True): -"""Returns the cloud encoding of the coder for the output of a transform.""" -from apache_beam.runners.portability.fn_api_runner.translations import \ - only_element -if len(transform_node.outputs) == 1: - output_tag = only_element(transform_node.outputs.keys()) - # TODO(robertwb): Handle type hints for multi-output transforms. + def _get_encoded_output_coder( + self, transform_node, window_value=True, output_tag=None): +"""Returns the cloud encoding of the coder for the output of a transform. + +If output_tag is not specified, we assume all outputs to have the same +encoding. +""" +from apache_beam.transforms.core import RunnerAPIPTransformHolder +external_transform = isinstance( +transform_node.transform, RunnerAPIPTransformHolder) +if external_transform and not output_tag: + raise Exception('For external transforms, output_tag must be specified') Review comment: Why? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411480) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: Major > Time Spent: 12.5h > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411475&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411475 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 27/Mar/20 23:07 Start Date: 27/Mar/20 23:07 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11185: [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs URL: https://github.com/apache/beam/pull/11185#discussion_r399576434 ## File path: sdks/python/apache_beam/coders/coders.py ## @@ -1383,22 +1384,69 @@ def from_runner_api_parameter(payload, components, context): write_state_threshold=int(payload)) -class RunnerAPICoderHolder(Coder): +class ElementTypeHolder(typehints.TypeConstraint): Review comment: Should this be an ExternalElementType? Even better, perhaps this should be a CoderElementType that holds a Coder (external or not), and then an ExternalCoder would be a Coder that holds a proto. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411475) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: Major > Time Spent: 12h 20m > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411449&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411449 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:43 Start Date: 27/Mar/20 22:43 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399570655 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ## @@ -415,24 +412,24 @@ def _run_bundle_multiple_times_for_testing( cache_token_generator=cache_token_generator) testing_bundle_manager.process_bundle(data_input, data_output) finally: -worker_handler.state.restore() + runner_execution_context.worker_handler_manager.state_servicer.restore() def _collect_written_timers_and_add_to_deferred_inputs( self, - pipeline_components, # type: beam_runner_api_pb2.Components - stage, # type: translations.Stage + runner_execution_context, # type: execution.FnApiRunnerExecutionContext bundle_context_manager, # type: execution.BundleContextManager deferred_inputs, # type: MutableMapping[str, PartitionableBuffer] - data_channel_coders, # type: Mapping[str, str] ): # type: (...) -> None -for transform_id, timer_writes in stage.timer_pcollections: +for transform_id, timer_writes in \ Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411449) Time Spent: 2h 20m (was: 2h 10m) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411448&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411448 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:43 Start Date: 27/Mar/20 22:43 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399570601 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ## @@ -369,8 +366,8 @@ def _store_side_inputs_in_state(self, state_key = beam_fn_api_pb2.StateKey( iterable_side_input=beam_fn_api_pb2.StateKey.IterableSideInput( transform_id=transform_id, side_input_id=tag, window=window)) - bundle_context_manager.worker_handler.state.append_raw( - state_key, elements_data) + runner_execution_context.worker_handler_manager.state_servicer\ Review comment: I've just added the state_sevicer() from your first comment, but I've created a jira issue to track fixng that later on. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411448) Time Spent: 2h 10m (was: 2h) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411444&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411444 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:42 Start Date: 27/Mar/20 22:42 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399570204 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object): ``beam.PCollection``. """ def __init__(self, - worker_handler_factory, # type: Callable[[Optional[str], int], List[WorkerHandler]] + worker_handler_manager, # type: worker_handlers.WorkerHandlerManager pipeline_components, # type: beam_runner_api_pb2.Components safe_coders, data_channel_coders, ): """ -:param worker_handler_factory: A ``callable`` that takes in an environment +:param worker_handler_manager: A ``callable`` that takes in an environment Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411444) Time Spent: 1.5h (was: 1h 20m) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411447&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411447 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:42 Start Date: 27/Mar/20 22:42 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399570251 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object): ``beam.PCollection``. """ def __init__(self, - worker_handler_factory, # type: Callable[[Optional[str], int], List[WorkerHandler]] + worker_handler_manager, # type: worker_handlers.WorkerHandlerManager pipeline_components, # type: beam_runner_api_pb2.Components safe_coders, data_channel_coders, ): """ -:param worker_handler_factory: A ``callable`` that takes in an environment +:param worker_handler_manager: A ``callable`` that takes in an environment id and a number of workers, and returns a list of ``WorkerHandler``s. :param pipeline_components: (beam_runner_api_pb2.Components): TODO :param safe_coders: :param data_channel_coders: """ self.pcoll_buffers = {} # type: MutableMapping[bytes, PartitionableBuffer] -self.worker_handler_factory = worker_handler_factory +self.worker_handler_manager = worker_handler_manager self.pipeline_components = pipeline_components self.safe_coders = safe_coders self.data_channel_coders = data_channel_coders +self.pipeline_context = pipeline_context.PipelineContext( +self.pipeline_components, +iterable_state_write=self._iterable_state_write) +self._last_uid = -1 + + def next_uid(self): +self._last_uid += 1 +return str(self._last_uid) + + def _iterable_state_write(self, values, element_coder_impl): +# type: (...) -> bytes +token = unique_name(None, 'iter').encode('ascii') +out = create_OutputStream() +for element in values: + element_coder_impl.encode_to_stream(element, out, True) +self.worker_handler_manager.state_servicer.append_raw( +beam_fn_api_pb2.StateKey( +runner=beam_fn_api_pb2.StateKey.Runner(key=token)), +out.get()) +return token + class BundleContextManager(object): def __init__(self, - execution_context, # type: FnApiRunnerExecutionContext - process_bundle_descriptor, # type: beam_fn_api_pb2.ProcessBundleDescriptor - worker_handler, # type: fn_runner.WorkerHandler - p_context, # type: pipeline_context.PipelineContext - ): + execution_context, # type: FnApiRunnerExecutionContext + stage, # type: translations.Stage + num_workers, # type: int + ): self.execution_context = execution_context -self.process_bundle_descriptor = process_bundle_descriptor -self.worker_handler = worker_handler -self.pipeline_context = p_context +self.stage = stage +self.bundle_uid = self.execution_context.next_uid() +self.num_workers = num_workers + +# Properties that are lazily initialized +self._process_bundle_descriptor = None +self._worker_handlers = None + + @property + def worker_handlers(self): +if self._worker_handlers is None: + self._worker_handlers = self.execution_context.worker_handler_manager\ +.get_worker_handlers(self.stage.environment, self.num_workers) +return self._worker_handlers + + def data_api_service_descriptor(self): +# All worker_handlers share the same grpc server, so we can read grpc server +# info from any worker_handler and read from the first worker_handler. +return next(iter(self.worker_handlers)).data_api_service_descriptor() + + def state_api_service_descriptor(self): +# All worker_handlers share the same grpc server, so we can read grpc server +# info from any worker_handler and read from the first worker_handler. +return next(iter(self.worker_handlers)).state_api_service_descriptor() + + @property + def process_bundle_descriptor(self): +if self._process_bundle_descriptor is None: + self._process_bundle_descriptor = self._build_process_bundle_descriptor() +return self._process_bundle_descriptor + + def _build_process_bundle_descriptor(self): +res = beam_fn_api_pb2.ProcessBundleDescriptor( +id=self.bundle_uid, +transforms={ +transform.unique_name: transform +for transform in self.stage.transforms +}, +pcollections=dict( +self.execution_context.pipeline_components.pcollections.items()), +
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411445&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411445 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:42 Start Date: 27/Mar/20 22:42 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399570217 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object): ``beam.PCollection``. """ def __init__(self, - worker_handler_factory, # type: Callable[[Optional[str], int], List[WorkerHandler]] + worker_handler_manager, # type: worker_handlers.WorkerHandlerManager pipeline_components, # type: beam_runner_api_pb2.Components safe_coders, data_channel_coders, ): """ -:param worker_handler_factory: A ``callable`` that takes in an environment +:param worker_handler_manager: A ``callable`` that takes in an environment id and a number of workers, and returns a list of ``WorkerHandler``s. :param pipeline_components: (beam_runner_api_pb2.Components): TODO :param safe_coders: :param data_channel_coders: """ self.pcoll_buffers = {} # type: MutableMapping[bytes, PartitionableBuffer] -self.worker_handler_factory = worker_handler_factory +self.worker_handler_manager = worker_handler_manager self.pipeline_components = pipeline_components self.safe_coders = safe_coders self.data_channel_coders = data_channel_coders +self.pipeline_context = pipeline_context.PipelineContext( +self.pipeline_components, +iterable_state_write=self._iterable_state_write) +self._last_uid = -1 + + def next_uid(self): +self._last_uid += 1 +return str(self._last_uid) + + def _iterable_state_write(self, values, element_coder_impl): +# type: (...) -> bytes +token = unique_name(None, 'iter').encode('ascii') +out = create_OutputStream() +for element in values: + element_coder_impl.encode_to_stream(element, out, True) +self.worker_handler_manager.state_servicer.append_raw( +beam_fn_api_pb2.StateKey( +runner=beam_fn_api_pb2.StateKey.Runner(key=token)), +out.get()) +return token + class BundleContextManager(object): def __init__(self, - execution_context, # type: FnApiRunnerExecutionContext - process_bundle_descriptor, # type: beam_fn_api_pb2.ProcessBundleDescriptor - worker_handler, # type: fn_runner.WorkerHandler - p_context, # type: pipeline_context.PipelineContext - ): + execution_context, # type: FnApiRunnerExecutionContext + stage, # type: translations.Stage + num_workers, # type: int + ): self.execution_context = execution_context -self.process_bundle_descriptor = process_bundle_descriptor -self.worker_handler = worker_handler -self.pipeline_context = p_context +self.stage = stage +self.bundle_uid = self.execution_context.next_uid() +self.num_workers = num_workers + +# Properties that are lazily initialized +self._process_bundle_descriptor = None +self._worker_handlers = None + + @property + def worker_handlers(self): +if self._worker_handlers is None: + self._worker_handlers = self.execution_context.worker_handler_manager\ Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411445) Time Spent: 1h 40m (was: 1.5h) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411446&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411446 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:42 Start Date: 27/Mar/20 22:42 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399570237 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object): ``beam.PCollection``. """ def __init__(self, - worker_handler_factory, # type: Callable[[Optional[str], int], List[WorkerHandler]] + worker_handler_manager, # type: worker_handlers.WorkerHandlerManager pipeline_components, # type: beam_runner_api_pb2.Components safe_coders, data_channel_coders, ): """ -:param worker_handler_factory: A ``callable`` that takes in an environment +:param worker_handler_manager: A ``callable`` that takes in an environment id and a number of workers, and returns a list of ``WorkerHandler``s. :param pipeline_components: (beam_runner_api_pb2.Components): TODO :param safe_coders: :param data_channel_coders: """ self.pcoll_buffers = {} # type: MutableMapping[bytes, PartitionableBuffer] -self.worker_handler_factory = worker_handler_factory +self.worker_handler_manager = worker_handler_manager self.pipeline_components = pipeline_components self.safe_coders = safe_coders self.data_channel_coders = data_channel_coders +self.pipeline_context = pipeline_context.PipelineContext( +self.pipeline_components, +iterable_state_write=self._iterable_state_write) +self._last_uid = -1 + + def next_uid(self): +self._last_uid += 1 +return str(self._last_uid) + + def _iterable_state_write(self, values, element_coder_impl): +# type: (...) -> bytes +token = unique_name(None, 'iter').encode('ascii') +out = create_OutputStream() +for element in values: + element_coder_impl.encode_to_stream(element, out, True) +self.worker_handler_manager.state_servicer.append_raw( +beam_fn_api_pb2.StateKey( +runner=beam_fn_api_pb2.StateKey.Runner(key=token)), +out.get()) +return token + class BundleContextManager(object): def __init__(self, - execution_context, # type: FnApiRunnerExecutionContext - process_bundle_descriptor, # type: beam_fn_api_pb2.ProcessBundleDescriptor - worker_handler, # type: fn_runner.WorkerHandler - p_context, # type: pipeline_context.PipelineContext - ): + execution_context, # type: FnApiRunnerExecutionContext + stage, # type: translations.Stage + num_workers, # type: int + ): self.execution_context = execution_context -self.process_bundle_descriptor = process_bundle_descriptor -self.worker_handler = worker_handler -self.pipeline_context = p_context +self.stage = stage +self.bundle_uid = self.execution_context.next_uid() +self.num_workers = num_workers + +# Properties that are lazily initialized +self._process_bundle_descriptor = None +self._worker_handlers = None + + @property + def worker_handlers(self): +if self._worker_handlers is None: + self._worker_handlers = self.execution_context.worker_handler_manager\ +.get_worker_handlers(self.stage.environment, self.num_workers) +return self._worker_handlers + + def data_api_service_descriptor(self): +# All worker_handlers share the same grpc server, so we can read grpc server +# info from any worker_handler and read from the first worker_handler. +return next(iter(self.worker_handlers)).data_api_service_descriptor() Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411446) Time Spent: 1h 50m (was: 1h 40m) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Est
[jira] [Updated] (BEAM-9625) StateServicer should be owned by FnApiRunnerContextManager
[ https://issues.apache.org/jira/browse/BEAM-9625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pablo Estrada updated BEAM-9625: Status: Open (was: Triage Needed) > StateServicer should be owned by FnApiRunnerContextManager > -- > > Key: BEAM-9625 > URL: https://issues.apache.org/jira/browse/BEAM-9625 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Minor > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9625) StateServicer should be owned by FnApiRunnerContextManager
Pablo Estrada created BEAM-9625: --- Summary: StateServicer should be owned by FnApiRunnerContextManager Key: BEAM-9625 URL: https://issues.apache.org/jira/browse/BEAM-9625 Project: Beam Issue Type: Sub-task Components: sdk-py-core Reporter: Pablo Estrada Assignee: Pablo Estrada -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9624) Combine operation should support only converting to accumulators
Andrew Crites created BEAM-9624: --- Summary: Combine operation should support only converting to accumulators Key: BEAM-9624 URL: https://issues.apache.org/jira/browse/BEAM-9624 Project: Beam Issue Type: Improvement Components: runner-core Reporter: Andrew Crites Assignee: Andrew Crites For streaming pipelines, we want to be able to lift the combiner into the MergeBuckets without having to also do a PartialGroupByKey before the shuffle. We don't want to do the PGBK since it could cause non-deterministic results when used with some triggers. We propose adding a new URN for doing just the convert to accumulators step and adding support for it in Java/Python/Go. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9454) Add Deduplication transform for SDF
[ https://issues.apache.org/jira/browse/BEAM-9454?focusedWorklogId=411426&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411426 ] ASF GitHub Bot logged work on BEAM-9454: Author: ASF GitHub Bot Created on: 27/Mar/20 22:06 Start Date: 27/Mar/20 22:06 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11060: [BEAM-9454] Create Deduplication transform based on user timer/state URL: https://github.com/apache/beam/pull/11060#discussion_r399559906 ## File path: sdks/python/apache_beam/transforms/deduplicate.py ## @@ -0,0 +1,133 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""a collection of ptransforms for deduplicating elements.""" + +from __future__ import absolute_import +from __future__ import division + +import typing + +from apache_beam import typehints +from apache_beam.coders.coders import BooleanCoder +from apache_beam.transforms import core +from apache_beam.transforms import ptransform +from apache_beam.transforms import userstate +from apache_beam.transforms.timeutil import TimeDomain +from apache_beam.utils import timestamp + +__all__ = [ +'Deduplicate', +'DeduplicatePerKey', +] + +K = typing.TypeVar('K') +V = typing.TypeVar('V') + + +@typehints.with_input_types(typing.Tuple[K, V]) +@typehints.with_output_types(typing.Tuple[K, V]) +class DeduplicatePerKey(ptransform.PTransform): + """ A PTransform which deduplicates pair over a time domain and + threshold. Values in different windows will NOT be considered duplicates of + each other. Deduplication is best effort. + + The durations specified may impose memory and/or storage requirements within + a runner and care might need to be used to ensure that the deduplication time + limit is long enough to remove duplicates but short enough to not cause + performance problems within a runner. Each runner may provide an optimized + implementation of their choice using the deduplication time domain and + threshold specified. + + Does not preserve any order the input PCollection might have had. + """ + def __init__(self, processing_time_duration=None, event_time_duration=None): +if processing_time_duration is None and event_time_duration is None: + raise ValueError( + 'DeduplicatePerKey requires at lease provide either' + 'processing_time_duration or event_time_duration.') +self.processing_time_duration = processing_time_duration +self.event_time_duration = event_time_duration + + def _create_deduplicate_fn(self): +processing_timer_spec = userstate.TimerSpec( +'processing_timer', TimeDomain.REAL_TIME) +event_timer_spec = userstate.TimerSpec('event_timer', TimeDomain.WATERMARK) +state_spec = userstate.BagStateSpec('seen', BooleanCoder()) Review comment: To follow up from what we discussed off-line, I think combining state semantically makes slightly more sense here, but as we're not adding to the state unless it's the first element there's no performance concerns, and I'm also fine with this as is. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411426) Time Spent: 5h 40m (was: 5.5h) > Add Deduplication transform for SDF > --- > > Key: BEAM-9454 > URL: https://issues.apache.org/jira/browse/BEAM-9454 > Project: Beam > Issue Type: New Feature > Components: sdk-java-core, sdk-py-core >Reporter: Boyuan Zhang >Priority: Major > Time Spent: 5h 40m > Remaining Estimate: 0h > > When SDF is used as a source-like operation, it's necessary to provide a > default Deduplication transform for the SDF user to deduplicate values by > certain unique id. -- This message wa
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411419&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411419 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:03 Start Date: 27/Mar/20 22:03 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399555388 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object): ``beam.PCollection``. """ def __init__(self, - worker_handler_factory, # type: Callable[[Optional[str], int], List[WorkerHandler]] + worker_handler_manager, # type: worker_handlers.WorkerHandlerManager pipeline_components, # type: beam_runner_api_pb2.Components safe_coders, data_channel_coders, ): """ -:param worker_handler_factory: A ``callable`` that takes in an environment +:param worker_handler_manager: A ``callable`` that takes in an environment Review comment: Fix description. (Interestingly, I just did this change as well for another PR.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411419) Time Spent: 1h (was: 50m) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411424&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411424 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:03 Start Date: 27/Mar/20 22:03 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399556279 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object): ``beam.PCollection``. """ def __init__(self, - worker_handler_factory, # type: Callable[[Optional[str], int], List[WorkerHandler]] + worker_handler_manager, # type: worker_handlers.WorkerHandlerManager pipeline_components, # type: beam_runner_api_pb2.Components safe_coders, data_channel_coders, ): """ -:param worker_handler_factory: A ``callable`` that takes in an environment +:param worker_handler_manager: A ``callable`` that takes in an environment id and a number of workers, and returns a list of ``WorkerHandler``s. :param pipeline_components: (beam_runner_api_pb2.Components): TODO :param safe_coders: :param data_channel_coders: """ self.pcoll_buffers = {} # type: MutableMapping[bytes, PartitionableBuffer] -self.worker_handler_factory = worker_handler_factory +self.worker_handler_manager = worker_handler_manager self.pipeline_components = pipeline_components self.safe_coders = safe_coders self.data_channel_coders = data_channel_coders +self.pipeline_context = pipeline_context.PipelineContext( +self.pipeline_components, +iterable_state_write=self._iterable_state_write) +self._last_uid = -1 + + def next_uid(self): +self._last_uid += 1 +return str(self._last_uid) + + def _iterable_state_write(self, values, element_coder_impl): +# type: (...) -> bytes +token = unique_name(None, 'iter').encode('ascii') +out = create_OutputStream() +for element in values: + element_coder_impl.encode_to_stream(element, out, True) +self.worker_handler_manager.state_servicer.append_raw( +beam_fn_api_pb2.StateKey( +runner=beam_fn_api_pb2.StateKey.Runner(key=token)), +out.get()) +return token + class BundleContextManager(object): def __init__(self, - execution_context, # type: FnApiRunnerExecutionContext - process_bundle_descriptor, # type: beam_fn_api_pb2.ProcessBundleDescriptor - worker_handler, # type: fn_runner.WorkerHandler - p_context, # type: pipeline_context.PipelineContext - ): + execution_context, # type: FnApiRunnerExecutionContext + stage, # type: translations.Stage + num_workers, # type: int + ): self.execution_context = execution_context -self.process_bundle_descriptor = process_bundle_descriptor -self.worker_handler = worker_handler -self.pipeline_context = p_context +self.stage = stage +self.bundle_uid = self.execution_context.next_uid() +self.num_workers = num_workers + +# Properties that are lazily initialized +self._process_bundle_descriptor = None +self._worker_handlers = None + + @property + def worker_handlers(self): +if self._worker_handlers is None: + self._worker_handlers = self.execution_context.worker_handler_manager\ Review comment: Prefer ()'s to backslashes for line breaks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411424) Time Spent: 1h 20m (was: 1h 10m) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411421&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411421 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:03 Start Date: 27/Mar/20 22:03 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399556846 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object): ``beam.PCollection``. """ def __init__(self, - worker_handler_factory, # type: Callable[[Optional[str], int], List[WorkerHandler]] + worker_handler_manager, # type: worker_handlers.WorkerHandlerManager pipeline_components, # type: beam_runner_api_pb2.Components safe_coders, data_channel_coders, ): """ -:param worker_handler_factory: A ``callable`` that takes in an environment +:param worker_handler_manager: A ``callable`` that takes in an environment id and a number of workers, and returns a list of ``WorkerHandler``s. :param pipeline_components: (beam_runner_api_pb2.Components): TODO :param safe_coders: :param data_channel_coders: """ self.pcoll_buffers = {} # type: MutableMapping[bytes, PartitionableBuffer] -self.worker_handler_factory = worker_handler_factory +self.worker_handler_manager = worker_handler_manager self.pipeline_components = pipeline_components self.safe_coders = safe_coders self.data_channel_coders = data_channel_coders +self.pipeline_context = pipeline_context.PipelineContext( +self.pipeline_components, +iterable_state_write=self._iterable_state_write) +self._last_uid = -1 + + def next_uid(self): +self._last_uid += 1 +return str(self._last_uid) + + def _iterable_state_write(self, values, element_coder_impl): +# type: (...) -> bytes +token = unique_name(None, 'iter').encode('ascii') +out = create_OutputStream() +for element in values: + element_coder_impl.encode_to_stream(element, out, True) +self.worker_handler_manager.state_servicer.append_raw( +beam_fn_api_pb2.StateKey( +runner=beam_fn_api_pb2.StateKey.Runner(key=token)), +out.get()) +return token + class BundleContextManager(object): def __init__(self, - execution_context, # type: FnApiRunnerExecutionContext - process_bundle_descriptor, # type: beam_fn_api_pb2.ProcessBundleDescriptor - worker_handler, # type: fn_runner.WorkerHandler - p_context, # type: pipeline_context.PipelineContext - ): + execution_context, # type: FnApiRunnerExecutionContext + stage, # type: translations.Stage + num_workers, # type: int + ): self.execution_context = execution_context -self.process_bundle_descriptor = process_bundle_descriptor -self.worker_handler = worker_handler -self.pipeline_context = p_context +self.stage = stage +self.bundle_uid = self.execution_context.next_uid() +self.num_workers = num_workers + +# Properties that are lazily initialized +self._process_bundle_descriptor = None +self._worker_handlers = None + + @property + def worker_handlers(self): +if self._worker_handlers is None: + self._worker_handlers = self.execution_context.worker_handler_manager\ +.get_worker_handlers(self.stage.environment, self.num_workers) +return self._worker_handlers + + def data_api_service_descriptor(self): +# All worker_handlers share the same grpc server, so we can read grpc server +# info from any worker_handler and read from the first worker_handler. +return next(iter(self.worker_handlers)).data_api_service_descriptor() Review comment: I don't know how critical this is for performance (mostly for tests), but `self.worker_handlers[0]` might be preferable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411421) Time Spent: 1h 10m (was: 1h) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type:
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411422&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411422 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:03 Start Date: 27/Mar/20 22:03 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399557974 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ## @@ -369,8 +366,8 @@ def _store_side_inputs_in_state(self, state_key = beam_fn_api_pb2.StateKey( iterable_side_input=beam_fn_api_pb2.StateKey.IterableSideInput( transform_id=transform_id, side_input_id=tag, window=window)) - bundle_context_manager.worker_handler.state.append_raw( - state_key, elements_data) + runner_execution_context.worker_handler_manager.state_servicer\ Review comment: Perhaps add a state_servicer() method right on runner_execution_context (and use several places below)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411422) Time Spent: 1h 20m (was: 1h 10m) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411423&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411423 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:03 Start Date: 27/Mar/20 22:03 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399557293 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object): ``beam.PCollection``. """ def __init__(self, - worker_handler_factory, # type: Callable[[Optional[str], int], List[WorkerHandler]] + worker_handler_manager, # type: worker_handlers.WorkerHandlerManager pipeline_components, # type: beam_runner_api_pb2.Components safe_coders, data_channel_coders, ): """ -:param worker_handler_factory: A ``callable`` that takes in an environment +:param worker_handler_manager: A ``callable`` that takes in an environment id and a number of workers, and returns a list of ``WorkerHandler``s. :param pipeline_components: (beam_runner_api_pb2.Components): TODO :param safe_coders: :param data_channel_coders: """ self.pcoll_buffers = {} # type: MutableMapping[bytes, PartitionableBuffer] -self.worker_handler_factory = worker_handler_factory +self.worker_handler_manager = worker_handler_manager self.pipeline_components = pipeline_components self.safe_coders = safe_coders self.data_channel_coders = data_channel_coders +self.pipeline_context = pipeline_context.PipelineContext( +self.pipeline_components, +iterable_state_write=self._iterable_state_write) +self._last_uid = -1 + + def next_uid(self): +self._last_uid += 1 +return str(self._last_uid) + + def _iterable_state_write(self, values, element_coder_impl): +# type: (...) -> bytes +token = unique_name(None, 'iter').encode('ascii') +out = create_OutputStream() +for element in values: + element_coder_impl.encode_to_stream(element, out, True) +self.worker_handler_manager.state_servicer.append_raw( +beam_fn_api_pb2.StateKey( +runner=beam_fn_api_pb2.StateKey.Runner(key=token)), +out.get()) +return token + class BundleContextManager(object): def __init__(self, - execution_context, # type: FnApiRunnerExecutionContext - process_bundle_descriptor, # type: beam_fn_api_pb2.ProcessBundleDescriptor - worker_handler, # type: fn_runner.WorkerHandler - p_context, # type: pipeline_context.PipelineContext - ): + execution_context, # type: FnApiRunnerExecutionContext + stage, # type: translations.Stage + num_workers, # type: int + ): self.execution_context = execution_context -self.process_bundle_descriptor = process_bundle_descriptor -self.worker_handler = worker_handler -self.pipeline_context = p_context +self.stage = stage +self.bundle_uid = self.execution_context.next_uid() +self.num_workers = num_workers + +# Properties that are lazily initialized +self._process_bundle_descriptor = None +self._worker_handlers = None + + @property + def worker_handlers(self): +if self._worker_handlers is None: + self._worker_handlers = self.execution_context.worker_handler_manager\ +.get_worker_handlers(self.stage.environment, self.num_workers) +return self._worker_handlers + + def data_api_service_descriptor(self): +# All worker_handlers share the same grpc server, so we can read grpc server +# info from any worker_handler and read from the first worker_handler. +return next(iter(self.worker_handlers)).data_api_service_descriptor() + + def state_api_service_descriptor(self): +# All worker_handlers share the same grpc server, so we can read grpc server +# info from any worker_handler and read from the first worker_handler. +return next(iter(self.worker_handlers)).state_api_service_descriptor() + + @property + def process_bundle_descriptor(self): +if self._process_bundle_descriptor is None: + self._process_bundle_descriptor = self._build_process_bundle_descriptor() +return self._process_bundle_descriptor + + def _build_process_bundle_descriptor(self): +res = beam_fn_api_pb2.ProcessBundleDescriptor( +id=self.bundle_uid, +transforms={ +transform.unique_name: transform +for transform in self.stage.transforms +}, +pcollections=dict( +self.execution_context.pipeline_components.pcollections.items()), +
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411420&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411420 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:03 Start Date: 27/Mar/20 22:03 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399558218 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ## @@ -415,24 +412,24 @@ def _run_bundle_multiple_times_for_testing( cache_token_generator=cache_token_generator) testing_bundle_manager.process_bundle(data_input, data_output) finally: -worker_handler.state.restore() + runner_execution_context.worker_handler_manager.state_servicer.restore() def _collect_written_timers_and_add_to_deferred_inputs( self, - pipeline_components, # type: beam_runner_api_pb2.Components - stage, # type: translations.Stage + runner_execution_context, # type: execution.FnApiRunnerExecutionContext bundle_context_manager, # type: execution.BundleContextManager deferred_inputs, # type: MutableMapping[str, PartitionableBuffer] - data_channel_coders, # type: Mapping[str, str] ): # type: (...) -> None -for transform_id, timer_writes in stage.timer_pcollections: +for transform_id, timer_writes in \ Review comment: backslash This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411420) Time Spent: 1h 10m (was: 1h) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411417&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411417 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 27/Mar/20 22:03 Start Date: 27/Mar/20 22:03 Worklog Time Spent: 10m Work Description: steveniemitz commented on issue #11226: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11226#issuecomment-605334008 looks great! These tests will be really helpful to ensure we don't have more regressions in the future. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411417) Time Spent: 3h 10m (was: 3h) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 3h 10m > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle
[ https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411425&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411425 ] ASF GitHub Bot logged work on BEAM-9608: Author: ASF GitHub Bot Created on: 27/Mar/20 22:03 Start Date: 27/Mar/20 22:03 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11229: [BEAM-9608] Increasing scope of context managers for FnApiRunner URL: https://github.com/apache/beam/pull/11229#discussion_r399558954 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ## @@ -369,8 +366,8 @@ def _store_side_inputs_in_state(self, state_key = beam_fn_api_pb2.StateKey( iterable_side_input=beam_fn_api_pb2.StateKey.IterableSideInput( transform_id=transform_id, side_input_id=tag, window=window)) - bundle_context_manager.worker_handler.state.append_raw( - state_key, elements_data) + runner_execution_context.worker_handler_manager.state_servicer\ Review comment: On second thought, perhaps the ownership of state servicer should be moved up to runner_execution_context (though the worker manager may need a reference). Your call. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411425) Time Spent: 1h 20m (was: 1h 10m) > Add context managers for FnApiRunner to manage execution of each bundle > --- > > Key: BEAM-9608 > URL: https://issues.apache.org/jira/browse/BEAM-9608 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411414&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411414 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 27/Mar/20 21:58 Start Date: 27/Mar/20 21:58 Worklog Time Spent: 10m Work Description: reuvenlax commented on issue #11226: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11226#issuecomment-605332723 @steveniemitz tests added This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411414) Time Spent: 3h (was: 2h 50m) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 3h > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8280) re-enable IOTypeHints.from_callable
[ https://issues.apache.org/jira/browse/BEAM-8280?focusedWorklogId=411412&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411412 ] ASF GitHub Bot logged work on BEAM-8280: Author: ASF GitHub Bot Created on: 27/Mar/20 21:51 Start Date: 27/Mar/20 21:51 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11232: [BEAM-8280] Document Python 3 annotations support URL: https://github.com/apache/beam/pull/11232#discussion_r399554700 ## File path: website/src/documentation/sdks/python-type-safety.md ## @@ -64,22 +84,75 @@ To specify type hints as properties of a `DoFn` or `PTransform`, use the decorat The following code declares an `int` type hint on `FilterEvensDoFn`, using the decorator `@with_input_types()`. +```py +{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:type_hints_do_fn %} ``` -{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:type_hints_do_fn %}``` Decorators receive an arbitrary number of positional and/or keyword arguments, typically interpreted in the context of the function they're wrapping. Generally the first argument is a type hint for the main input, and additional arguments are type hints for side inputs. -### Defining Generic Types +### Declaring Type Hints Using Annotations Review comment: "Declaring Type Hints Using Type Annotations" (might be a bit redundant, but is clearer). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411412) > re-enable IOTypeHints.from_callable > --- > > Key: BEAM-8280 > URL: https://issues.apache.org/jira/browse/BEAM-8280 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Udi Meiri >Assignee: Udi Meiri >Priority: Major > Time Spent: 11h 40m > Remaining Estimate: 0h > > See https://issues.apache.org/jira/browse/BEAM-8279 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8280) re-enable IOTypeHints.from_callable
[ https://issues.apache.org/jira/browse/BEAM-8280?focusedWorklogId=411410&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411410 ] ASF GitHub Bot logged work on BEAM-8280: Author: ASF GitHub Bot Created on: 27/Mar/20 21:51 Start Date: 27/Mar/20 21:51 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11232: [BEAM-8280] Document Python 3 annotations support URL: https://github.com/apache/beam/pull/11232#discussion_r399553756 ## File path: website/src/documentation/sdks/python-type-safety.md ## @@ -23,38 +23,58 @@ Python is a dynamically-typed language with no static type checking. Because of The Apache Beam SDK for Python uses **type hints** during pipeline construction and runtime to try to emulate the correctness guarantees achieved by true static typing. Additionally, using type hints lays some groundwork that allows the backend service to perform efficient type deduction and registration of `Coder` objects. -Python version 3.5 introduces a module called **typing** to provide hints for type validators in the language. The Beam SDK for Python, based on Python version 2.7, implements a subset of [PEP 484](https://www.python.org/dev/peps/pep-0484/) and aims to follow it as closely as possible in its own typehints module. +Python version 3.5 introduces a module called **typing** to provide hints for type validators in the language. +The Beam SDK for Python implements a subset of [PEP 484](https://www.python.org/dev/peps/pep-0484/) and aims to follow it as closely as possible in its own typehints module. + +These flags control Beam type safety: +- `--no_pipeline_type_check` + + Disables type checking during pipeline construction. + Default is to perform these checks. +- `--runtime_type_check` + + Enables runtime type checking of every element. + This may affect pipeline performance, so the default is to skip these checks. ## Benefits of Type Hints -The Beam SDK for Python includes some automatic type checking: for example, some `PTransform`s, such as `Create` and simple `ParDo` transforms, attempt to deduce their output type given their input. However, the Beam cannot infer types in all cases. Therefore, the recommendation is that you declare type hints to aid you in performing your own type checks if necessary. +The Beam SDK for Python includes some automatic type checking: for example, some `PTransforms`, such as `Create` and simple `ParDo` transforms, attempt to deduce their output type given their input. However, the Beam cannot infer types in all cases. Therefore, the recommendation is that you declare type hints to aid you in performing your own type checks if necessary. -When you use type hints, the runner raises exceptions during pipeline construction time, rather than runtime. For example, the runner generates an exception if it detects that your pipeline applies mismatched `PTransforms` (where the expected outputs of one transform do not match the expected inputs of the following transform). These exceptions are raised at pipeline construction time, regardless of where your pipeline will execute. Introducing type hints for the `PTransform`s you define allows you to catch potential bugs up front in the local runner, rather than after minutes of execution into a deep, complex pipeline. +When you use type hints, the runner raises exceptions during pipeline construction time, rather than runtime. For example, the runner generates an exception if it detects that your pipeline applies mismatched `PTransforms` (where the expected outputs of one transform do not match the expected inputs of the following transform). These exceptions are raised at pipeline construction time, regardless of where your pipeline will execute. Introducing type hints for the `PTransforms` you define allows you to catch potential bugs up front in the local runner, rather than after minutes of execution into a deep, complex pipeline. Review comment: I might lead with this paragraph, maybe even giving a very concrete example ("for example, giving an error when trying to apply a PTransform that expects a PCollection of strings to a PCollection of ints"). Then put the other paragraph about Beam not always being able to infer types next. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411410) Time Spent: 11h 40m (was: 11.5h) > re-enable IOTypeHints.from_callable > --- > > Key: BEAM-8280 > U
[jira] [Work logged] (BEAM-8280) re-enable IOTypeHints.from_callable
[ https://issues.apache.org/jira/browse/BEAM-8280?focusedWorklogId=411411&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411411 ] ASF GitHub Bot logged work on BEAM-8280: Author: ASF GitHub Bot Created on: 27/Mar/20 21:51 Start Date: 27/Mar/20 21:51 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #11232: [BEAM-8280] Document Python 3 annotations support URL: https://github.com/apache/beam/pull/11232#discussion_r399554554 ## File path: website/src/documentation/sdks/python-type-safety.md ## @@ -64,22 +84,75 @@ To specify type hints as properties of a `DoFn` or `PTransform`, use the decorat The following code declares an `int` type hint on `FilterEvensDoFn`, using the decorator `@with_input_types()`. +```py +{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:type_hints_do_fn %} ``` -{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:type_hints_do_fn %}``` Decorators receive an arbitrary number of positional and/or keyword arguments, typically interpreted in the context of the function they're wrapping. Generally the first argument is a type hint for the main input, and additional arguments are type hints for side inputs. -### Defining Generic Types +### Declaring Type Hints Using Annotations Review comment: This should be the preferred way; put it first. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411411) Time Spent: 11h 40m (was: 11.5h) > re-enable IOTypeHints.from_callable > --- > > Key: BEAM-8280 > URL: https://issues.apache.org/jira/browse/BEAM-8280 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Udi Meiri >Assignee: Udi Meiri >Priority: Major > Time Spent: 11h 40m > Remaining Estimate: 0h > > See https://issues.apache.org/jira/browse/BEAM-8279 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9512) Anonymous structs have name collision in schema
[ https://issues.apache.org/jira/browse/BEAM-9512?focusedWorklogId=411409&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411409 ] ASF GitHub Bot logged work on BEAM-9512: Author: ASF GitHub Bot Created on: 27/Mar/20 21:49 Start Date: 27/Mar/20 21:49 Worklog Time Spent: 10m Work Description: apilloud commented on pull request #11255: [BEAM-9512] Map anonymous structs to schema URL: https://github.com/apache/beam/pull/11255 Map anonymous structs to schemas. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastComp
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411402&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411402 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 27/Mar/20 21:34 Start Date: 27/Mar/20 21:34 Worklog Time Spent: 10m Work Description: reuvenlax commented on issue #11226: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11226#issuecomment-605324797 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411402) Time Spent: 2h 40m (was: 2.5h) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 2h 40m > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411401&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411401 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 27/Mar/20 21:34 Start Date: 27/Mar/20 21:34 Worklog Time Spent: 10m Work Description: reuvenlax commented on issue #11226: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11226#issuecomment-605324746 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411401) Time Spent: 2.5h (was: 2h 20m) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 2.5h > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411403&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411403 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 27/Mar/20 21:34 Start Date: 27/Mar/20 21:34 Worklog Time Spent: 10m Work Description: reuvenlax commented on issue #11226: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11226#issuecomment-605324853 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411403) Time Spent: 2h 50m (was: 2h 40m) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 2h 50m > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411398&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411398 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 27/Mar/20 21:30 Start Date: 27/Mar/20 21:30 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11252#issuecomment-605323570 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411398) Time Spent: 2h 20m (was: 2h 10m) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 2h 20m > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411396&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411396 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 27/Mar/20 21:29 Start Date: 27/Mar/20 21:29 Worklog Time Spent: 10m Work Description: aaltay commented on issue #11226: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11226#issuecomment-605323118 > @aaltay added new unit tests and fixed the broken one. However I'm noticing that test triggering seems broken on a number of PRs now. Is something wrong with the Jenkins master? Yes. Ongoing issue. @yifanzou is working on it. Repeating the test triggers a few times usually helps. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411396) Time Spent: 2h 10m (was: 2h) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 2h 10m > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window
[ https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411393&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411393 ] ASF GitHub Bot logged work on BEAM-9557: Author: ASF GitHub Bot Created on: 27/Mar/20 21:25 Start Date: 27/Mar/20 21:25 Worklog Time Spent: 10m Work Description: reuvenlax commented on issue #11226: [BEAM-9557] Fix timer window boundary checking URL: https://github.com/apache/beam/pull/11226#issuecomment-605321717 @aaltay added new unit tests and fixed the broken one. However I'm noticing that test triggering seems broken on a number of PRs now. Is something wrong with the Jenkins master? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 411393) Time Spent: 2h (was: 1h 50m) > Error setting processing time timers near end-of-window > --- > > Key: BEAM-9557 > URL: https://issues.apache.org/jira/browse/BEAM-9557 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Steve Niemitz >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > Time Spent: 2h > Remaining Estimate: 0h > > Previously, it was possible to set a processing time timer past the end of a > window, and it would simply not fire. > However, now, this results in an error: > {code:java} > java.lang.IllegalArgumentException: Attempted to set event time timer that > outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of > window 2020-03-19T17:59:59.999Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934) > .processElement(???.scala:187) > {code} > > I think the regression was introduced in commit > a005fd765a762183ca88df90f261f6d4a20cf3e0. Also notice that the error message > is wrong, it says that "event time timer" but the timer is in the processing > time domain. -- This message was sent by Atlassian Jira (v8.3.4#803005)