[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411631=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411631
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 28/Mar/20 05:56
Start Date: 28/Mar/20 05:56
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix 
timer window boundary checking
URL: https://github.com/apache/beam/pull/11252#issuecomment-605401409
 
 
   Run PythonFormatter PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411631)
Time Spent: 5h 20m  (was: 5h 10m)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411630=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411630
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 28/Mar/20 05:56
Start Date: 28/Mar/20 05:56
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix 
timer window boundary checking
URL: https://github.com/apache/beam/pull/11252#issuecomment-605401405
 
 
   Run Java_Examples_Dataflow PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411630)
Time Spent: 5h 10m  (was: 5h)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411629=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411629
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 28/Mar/20 05:56
Start Date: 28/Mar/20 05:56
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix 
timer window boundary checking
URL: https://github.com/apache/beam/pull/11252#issuecomment-605401396
 
 
   Run CommunityMetrics PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411629)
Time Spent: 5h  (was: 4h 50m)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411632=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411632
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 28/Mar/20 05:56
Start Date: 28/Mar/20 05:56
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix 
timer window boundary checking
URL: https://github.com/apache/beam/pull/11252#issuecomment-605401420
 
 
   Run PythonLint PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411632)
Time Spent: 5.5h  (was: 5h 20m)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411626=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411626
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 28/Mar/20 05:54
Start Date: 28/Mar/20 05:54
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix 
timer window boundary checking
URL: https://github.com/apache/beam/pull/11252#issuecomment-605401262
 
 
   Run JavaBeamZetaSQL PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411626)
Time Spent: 4.5h  (was: 4h 20m)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411625=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411625
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 28/Mar/20 05:54
Start Date: 28/Mar/20 05:54
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix 
timer window boundary checking
URL: https://github.com/apache/beam/pull/11252#issuecomment-605401246
 
 
   Run Go PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411625)
Time Spent: 4h 20m  (was: 4h 10m)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411627=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411627
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 28/Mar/20 05:54
Start Date: 28/Mar/20 05:54
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix 
timer window boundary checking
URL: https://github.com/apache/beam/pull/11252#issuecomment-605401285
 
 
   Run Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411627)
Time Spent: 4h 40m  (was: 4.5h)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411628=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411628
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 28/Mar/20 05:54
Start Date: 28/Mar/20 05:54
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix 
timer window boundary checking
URL: https://github.com/apache/beam/pull/11252#issuecomment-605401312
 
 
   Run Website_Stage_GCS PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411628)
Time Spent: 4h 50m  (was: 4h 40m)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411624=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411624
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 28/Mar/20 05:54
Start Date: 28/Mar/20 05:54
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix 
timer window boundary checking
URL: https://github.com/apache/beam/pull/11252#issuecomment-605401231
 
 
   Run JavaPortabilityApi PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411624)
Time Spent: 4h 10m  (was: 4h)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-9198) BeamSQL aggregation analytics functionality

2020-03-27 Thread Rui Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Wang updated BEAM-9198:
---
Description: 
Mentor email: ruw...@google.com. Feel free to send emails for your questions.


Project Information
-
BeamSQL has a long list of of aggregation/aggregation analytics functionalities 
to support. 


To begin with, you will need to support this syntax:

{code:sql}
analytic_function_name ( [ argument_list ] )
  OVER (
[ PARTITION BY partition_expression_list ]
[ ORDER BY expression [{ ASC | DESC }] [, ...] ]
[ window_frame_clause ]
  )
{code}


As there is a long list of analytics functions, a good start point is support 
rank() first.

This will requires touch core components of BeamSQL:
1. SQL parser to support the syntax above.
2. SQL core to implement physical relational operator.
3. Distributed algorithms to implement a list of functions in a distributed 
manner. 
4. Enable in ZetaSQL dialect.


To understand what SQL analytics functionality is, you could check this great 
explanation doc: 
https://cloud.google.com/bigquery/docs/reference/standard-sql/analytic-function-concepts.

To know about Beam's programming model, check: 
https://beam.apache.org/documentation/programming-guide/#overview



  was:
Mentor email: ruw...@google.com. Feel free to send emails for your questions.


Project Information
-
BeamSQL has a long list of of aggregation/aggregation analytics functionalities 
to support. 


To begin with, you will need to support this syntax:

{code:sql}
analytic_function_name ( [ argument_list ] )
  OVER (
[ PARTITION BY partition_expression_list ]
[ ORDER BY expression [{ ASC | DESC }] [, ...] ]
[ window_frame_clause ]
  )
{code}


As there is a long list of analytics functions, a good start point is support 
rank() first.

This will requires touch core components of BeamSQL:
1. SQL parser to support the syntax above.
2. SQL core to implement physical relational operator.
3. Distributed algorithms to implement a list of functions in a distributed 
manner. 
4. Build benchmarks to measure performance of your implementation.



To understand what SQL analytics functionality is, you could check this great 
explanation doc: 
https://cloud.google.com/bigquery/docs/reference/standard-sql/analytic-function-concepts.

To know about Beam's programming model, check: 
https://beam.apache.org/documentation/programming-guide/#overview




> BeamSQL aggregation analytics functionality 
> 
>
> Key: BEAM-9198
> URL: https://issues.apache.org/jira/browse/BEAM-9198
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Rui Wang
>Priority: Major
>  Labels: gsoc, gsoc2020, mentor
>
> Mentor email: ruw...@google.com. Feel free to send emails for your questions.
> Project Information
> -
> BeamSQL has a long list of of aggregation/aggregation analytics 
> functionalities to support. 
> To begin with, you will need to support this syntax:
> {code:sql}
> analytic_function_name ( [ argument_list ] )
>   OVER (
> [ PARTITION BY partition_expression_list ]
> [ ORDER BY expression [{ ASC | DESC }] [, ...] ]
> [ window_frame_clause ]
>   )
> {code}
> As there is a long list of analytics functions, a good start point is support 
> rank() first.
> This will requires touch core components of BeamSQL:
> 1. SQL parser to support the syntax above.
> 2. SQL core to implement physical relational operator.
> 3. Distributed algorithms to implement a list of functions in a distributed 
> manner. 
> 4. Enable in ZetaSQL dialect.
> To understand what SQL analytics functionality is, you could check this great 
> explanation doc: 
> https://cloud.google.com/bigquery/docs/reference/standard-sql/analytic-function-concepts.
> To know about Beam's programming model, check: 
> https://beam.apache.org/documentation/programming-guide/#overview



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411623=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411623
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 28/Mar/20 05:53
Start Date: 28/Mar/20 05:53
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix 
timer window boundary checking
URL: https://github.com/apache/beam/pull/11252#issuecomment-605401221
 
 
   Run Website PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411623)
Time Spent: 4h  (was: 3h 50m)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411622=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411622
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 28/Mar/20 05:47
Start Date: 28/Mar/20 05:47
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix 
timer window boundary checking
URL: https://github.com/apache/beam/pull/11252#issuecomment-605400710
 
 
   Run SQL PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411622)
Time Spent: 3h 50m  (was: 3h 40m)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-4374) Update existing metrics in the FN API to use new Metric Schema

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-4374?focusedWorklogId=411615=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411615
 ]

ASF GitHub Bot logged work on BEAM-4374:


Author: ASF GitHub Bot
Created on: 28/Mar/20 04:55
Start Date: 28/Mar/20 04:55
Worklog Time Spent: 10m 
  Work Description: lostluck commented on issue #11231: [BEAM-4374] 
Shortids for the Go SDK
URL: https://github.com/apache/beam/pull/11231#issuecomment-605395102
 
 
   R: @youngoli @lukecwik 
   
   I can't wait to get rid of both of the legacy stuff and the older monitoring 
info listing. That function is becoming crufty.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411615)
Time Spent: 35h 20m  (was: 35h 10m)

> Update existing metrics in the FN API to use new Metric Schema
> --
>
> Key: BEAM-4374
> URL: https://issues.apache.org/jira/browse/BEAM-4374
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Alex Amato
>Priority: Major
>  Time Spent: 35h 20m
>  Remaining Estimate: 0h
>
> Update existing metrics to use the new proto and cataloging schema defined in:
> [_https://s.apache.org/beam-fn-api-metrics_]
>  * Check in new protos
>  * Define catalog file for metrics
>  * Port existing metrics to use this new format, based on catalog 
> names+metadata



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-4374) Update existing metrics in the FN API to use new Metric Schema

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-4374?focusedWorklogId=411614=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411614
 ]

ASF GitHub Bot logged work on BEAM-4374:


Author: ASF GitHub Bot
Created on: 28/Mar/20 04:55
Start Date: 28/Mar/20 04:55
Worklog Time Spent: 10m 
  Work Description: lostluck commented on issue #11231: [BEAM-4374] 
Shortids for the Go SDK
URL: https://github.com/apache/beam/pull/11231#issuecomment-605395052
 
 
   Run Go Postcommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411614)
Time Spent: 35h 10m  (was: 35h)

> Update existing metrics in the FN API to use new Metric Schema
> --
>
> Key: BEAM-4374
> URL: https://issues.apache.org/jira/browse/BEAM-4374
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Alex Amato
>Priority: Major
>  Time Spent: 35h 10m
>  Remaining Estimate: 0h
>
> Update existing metrics to use the new proto and cataloging schema defined in:
> [_https://s.apache.org/beam-fn-api-metrics_]
>  * Check in new protos
>  * Define catalog file for metrics
>  * Port existing metrics to use this new format, based on catalog 
> names+metadata



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411603=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411603
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 28/Mar/20 04:17
Start Date: 28/Mar/20 04:17
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix 
timer window boundary checking
URL: https://github.com/apache/beam/pull/11252#issuecomment-605392133
 
 
   retest this please
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411603)
Time Spent: 3h 40m  (was: 3.5h)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9136) Add LICENSES and NOTICES to docker images

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9136?focusedWorklogId=411602=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411602
 ]

ASF GitHub Bot logged work on BEAM-9136:


Author: ASF GitHub Bot
Created on: 28/Mar/20 04:00
Start Date: 28/Mar/20 04:00
Worklog Time Spent: 10m 
  Work Description: lostluck commented on pull request #11246: 
[BEAM-9136]Add licenses for dependencies for Go
URL: https://github.com/apache/beam/pull/11246#discussion_r399615711
 
 

 ##
 File path: sdks/go/container/license_script.sh
 ##
 @@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+output_dir=third_party_licenses
+# remove output_dir if existing
+if [ -d "$output_dir" ]; then rm -rf $output_dir; fi
+
+# get go-licenses and run
+go get github.com/google/go-licenses
+$GOPATH/bin/go-licenses save "github.com/apache/beam/sdks/go/pkg/beam/" 
--save_path="$output_dir"
 
 Review comment:
   The Go SDK isn't currently up on all the best practices for Go. In 
particular, what you're running on your machine is probably not what Jenkin's 
is pulling down due to how versioning isn't being handled at present. It's not 
currently a priority to spend effort to resolve this state of affairs. It will 
not happen until after SDF and Beam Schema's have been added to the SDK later 
this year.
   
   I do not recommend spending time *now* to do this, but please keep 
sufficient notes so that we can resolve the Go situation later on.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411602)
Time Spent: 10h 20m  (was: 10h 10m)

> Add LICENSES and NOTICES to docker images
> -
>
> Key: BEAM-9136
> URL: https://issues.apache.org/jira/browse/BEAM-9136
> Project: Beam
>  Issue Type: Task
>  Components: build-system
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
>  Time Spent: 10h 20m
>  Remaining Estimate: 0h
>
> Scan dependencies and add licenses and notices of the dependencies to SDK 
> docker images.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9136) Add LICENSES and NOTICES to docker images

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9136?focusedWorklogId=411596=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411596
 ]

ASF GitHub Bot logged work on BEAM-9136:


Author: ASF GitHub Bot
Created on: 28/Mar/20 03:35
Start Date: 28/Mar/20 03:35
Worklog Time Spent: 10m 
  Work Description: tvalentyn commented on pull request #11246: 
[BEAM-9136]Add licenses for dependencies for Go
URL: https://github.com/apache/beam/pull/11246#discussion_r399613724
 
 

 ##
 File path: sdks/go/container/license_script.sh
 ##
 @@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+output_dir=third_party_licenses
+# remove output_dir if existing
+if [ -d "$output_dir" ]; then rm -rf $output_dir; fi
+
+# get go-licenses and run
+go get github.com/google/go-licenses
+$GOPATH/bin/go-licenses save "github.com/apache/beam/sdks/go/pkg/beam/" 
--save_path="$output_dir"
 
 Review comment:
   cc: @lostluck 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411596)
Time Spent: 10h 10m  (was: 10h)

> Add LICENSES and NOTICES to docker images
> -
>
> Key: BEAM-9136
> URL: https://issues.apache.org/jira/browse/BEAM-9136
> Project: Beam
>  Issue Type: Task
>  Components: build-system
>Reporter: Hannah Jiang
>Assignee: Hannah Jiang
>Priority: Major
>  Time Spent: 10h 10m
>  Remaining Estimate: 0h
>
> Scan dependencies and add licenses and notices of the dependencies to SDK 
> docker images.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411590=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411590
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 28/Mar/20 02:37
Start Date: 28/Mar/20 02:37
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #11226: [BEAM-9557] Fix 
timer window boundary checking
URL: https://github.com/apache/beam/pull/11226#issuecomment-605383329
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411590)
Time Spent: 3.5h  (was: 3h 20m)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9446) FlinkRunner discards parallelism and execution_mode_for_batch pipeline options

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9446?focusedWorklogId=411589=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411589
 ]

ASF GitHub Bot logged work on BEAM-9446:


Author: ASF GitHub Bot
Created on: 28/Mar/20 01:54
Start Date: 28/Mar/20 01:54
Worklog Time Spent: 10m 
  Work Description: ibzib commented on pull request #11189: [BEAM-9446] 
Retain unknown arguments when using uber jar job server.
URL: https://github.com/apache/beam/pull/11189#discussion_r399604848
 
 

 ##
 File path: sdks/python/apache_beam/options/pipeline_options.py
 ##
 @@ -285,10 +289,29 @@ def get_all_options(
   cls._add_argparse_args(parser)  # pylint: disable=protected-access
 if add_extra_args_fn:
   add_extra_args_fn(parser)
+
 known_args, unknown_args = parser.parse_known_args(self._flags)
-if unknown_args:
-  _LOGGER.warning("Discarding unparseable args: %s", unknown_args)
-result = vars(known_args)
+if retain_unknown_options:
+  i = 0
+  while i < len(unknown_args):
+# Treat all unary flags as booleans, and all binary argument values as
+# strings.
+if i + 1 >= len(unknown_args) or unknown_args[i + 1].startswith('-'):
+  split = unknown_args[i].rsplit('=')
 
 Review comment:
   Oh, I see what you mean now. I will commit your suggestion to avoid 
confusion. However, I don't think we should support unrecognized `append` 
arguments. I don't want to have to infer complex argument types. As for the 
specific case of `experiments`, that will not be a problem because 
`experiments` is already recognized by the parser.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411589)
Time Spent: 5h 10m  (was: 5h)

> FlinkRunner discards parallelism and execution_mode_for_batch pipeline options
> --
>
> Key: BEAM-9446
> URL: https://issues.apache.org/jira/browse/BEAM-9446
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Kyle Weaver
>Assignee: Kyle Weaver
>Priority: Major
>  Labels: portability-flink
> Fix For: 2.21.0
>
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> I need these options for TFX, but they're being discarded (I believe they are 
> normally supplied by the job server).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-4374) Update existing metrics in the FN API to use new Metric Schema

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-4374?focusedWorklogId=411587=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411587
 ]

ASF GitHub Bot logged work on BEAM-4374:


Author: ASF GitHub Bot
Created on: 28/Mar/20 01:35
Start Date: 28/Mar/20 01:35
Worklog Time Spent: 10m 
  Work Description: lostluck commented on issue #11231: [BEAM-4374] 
Shortids for the Go SDK
URL: https://github.com/apache/beam/pull/11231#issuecomment-605375733
 
 
   Run Go Postcommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411587)
Time Spent: 35h  (was: 34h 50m)

> Update existing metrics in the FN API to use new Metric Schema
> --
>
> Key: BEAM-4374
> URL: https://issues.apache.org/jira/browse/BEAM-4374
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Alex Amato
>Priority: Major
>  Time Spent: 35h
>  Remaining Estimate: 0h
>
> Update existing metrics to use the new proto and cataloging schema defined in:
> [_https://s.apache.org/beam-fn-api-metrics_]
>  * Check in new protos
>  * Define catalog file for metrics
>  * Port existing metrics to use this new format, based on catalog 
> names+metadata



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-4374) Update existing metrics in the FN API to use new Metric Schema

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-4374?focusedWorklogId=411586=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411586
 ]

ASF GitHub Bot logged work on BEAM-4374:


Author: ASF GitHub Bot
Created on: 28/Mar/20 01:34
Start Date: 28/Mar/20 01:34
Worklog Time Spent: 10m 
  Work Description: lostluck commented on pull request #11231: [BEAM-4374] 
Shortids for the Go SDK
URL: https://github.com/apache/beam/pull/11231#discussion_r399602340
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/harness/monitoring.go
 ##
 @@ -16,20 +16,165 @@
 package harness
 
 import (
+   "bytes"
+   "strconv"
+   "sync"
+   "sync/atomic"
"time"
 
+   "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
ppb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
"github.com/golang/protobuf/ptypes"
 )
 
-func monitoring(p *exec.Plan) (*fnpb.Metrics, []*ppb.MonitoringInfo) {
+type mUrn uint32
+type mType uint32
+
+// TODO: Pull these from the protos.
+var sUrns = []string{
+   "beam:metric:user:v1",
+   "beam:metric:element_count:v1",
+   "beam:metric:pardo_execution_time:start_bundle_msecs:v1",
+   "beam:metric:pardo_execution_time:process_bundle_msecs:v1",
+   "beam:metric:pardo_execution_time:finish_bundle_msecs:v1",
+   "beam:metric:ptransform_progress:remaining:v1",
+   "beam:metric:ptransform_progress:completed:v1",
+
+   "TestingSentinelUrn", // Must remain last.
+}
+
+const (
+   urnUser mUrn = iota
+   urnElementCount
+   urnStartBundle
+   urnProcessBundle
+   urnFinishBundle
+   urnProgressRemaining
+   urnProgressCompleted
+
+   urnTestSentinel // Must remain last.
+)
+
+var sTypes = []string{
+   "beam:metrics:sum_int64:v1",
+   "beam:metrics:sum_double:v1",
+   "beam:metrics:distribution_int64:v1",
+   "beam:metrics:distribution_double:v1",
+   "beam:metrics:latest_int64:v1",
+   "beam:metrics:latest_double:v1",
+   "beam:metrics:top_n_int64:v1",
+   "beam:metrics:top_n_double:v1",
+   "beam:metrics:bottom_n_int64:v1",
+   "beam:metrics:bottom_n_double:v1",
+   "beam:metrics:monitoring_table:v1",
+   "beam:metrics:progress:v1",
+
+   "TestingSentinelType", // Must remain last.
+}
+
+const (
 
 Review comment:
   Ack. I've kept the extra marker type around for the moment, but I might 
collapse things into the function to simplify some thing.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411586)
Time Spent: 34h 50m  (was: 34h 40m)

> Update existing metrics in the FN API to use new Metric Schema
> --
>
> Key: BEAM-4374
> URL: https://issues.apache.org/jira/browse/BEAM-4374
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Alex Amato
>Priority: Major
>  Time Spent: 34h 50m
>  Remaining Estimate: 0h
>
> Update existing metrics to use the new proto and cataloging schema defined in:
> [_https://s.apache.org/beam-fn-api-metrics_]
>  * Check in new protos
>  * Define catalog file for metrics
>  * Port existing metrics to use this new format, based on catalog 
> names+metadata



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-9626) pymongo should be an optional requirement

2020-03-27 Thread Ahmet Altay (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17069151#comment-17069151
 ] 

Ahmet Altay commented on BEAM-9626:
---

Why do we want to make this change especially retroactively for an IO that is 
already in the core package? What about the other IOs?

I do not think we would like to set a precedent for moving packages out of beam 
core unless there is a deeper issue. Also specifically for python IOs the 
growth rate outside of gcp package was not really significant so far.

> pymongo should be an optional requirement
> -
>
> Key: BEAM-9626
> URL: https://issues.apache.org/jira/browse/BEAM-9626
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Reporter: Chad Dombrova
>Assignee: Chad Dombrova
>Priority: Minor
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The pymongo driver is installed by default, but as the number of IO 
> connectors in the python sdk grows, I don't think this is the precedent we 
> want to set.  We already have "extra" packages for gcp, aws, and interactive, 
> we should also add one for mongo. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-9627) KafkaIO needs better support for SSL

2020-03-27 Thread Daniel Mills (Jira)
Daniel Mills created BEAM-9627:
--

 Summary: KafkaIO needs better support for SSL
 Key: BEAM-9627
 URL: https://issues.apache.org/jira/browse/BEAM-9627
 Project: Beam
  Issue Type: Improvement
  Components: io-java-kafka
Reporter: Daniel Mills


Configuring SSL for kafka requires pointing an option at local files containing 
keys and roots of trust as described here: 
[https://www.confluent.io/blog/apache-kafka-security-authorization-authentication-encryption/]

Currently, it is somewhat tricky to ensure that these files are written before 
KafkaIO starts reading from the source; one potential option would be to add an 
init hook where the user could download keys from the keystore of their choice 
and write them to local files.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411550=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411550
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 28/Mar/20 00:22
Start Date: 28/Mar/20 00:22
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: 
[BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399591425
 
 

 ##
 File path: sdks/python/apache_beam/coders/coder_impl.py
 ##
 @@ -629,22 +629,56 @@ def estimate_size(self, unused_value, nested=False):
 
 class TimerCoderImpl(StreamCoderImpl):
   """For internal use only; no backwards-compatibility guarantees."""
-  def __init__(self, payload_coder_impl):
+  def __init__(self, key_coder_impl, window_coder_impl, tag_coder_impl):
 
 Review comment:
   why do you have to specify the tag_coder_impl, shouldn't it always be 
Python's string utf8 coder?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411550)
Time Spent: 6h 10m  (was: 6h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411558=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411558
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 28/Mar/20 00:22
Start Date: 28/Mar/20 00:22
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: 
[BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399591742
 
 

 ##
 File path: sdks/python/apache_beam/coders/coder_impl.py
 ##
 @@ -629,22 +629,56 @@ def estimate_size(self, unused_value, nested=False):
 
 class TimerCoderImpl(StreamCoderImpl):
   """For internal use only; no backwards-compatibility guarantees."""
-  def __init__(self, payload_coder_impl):
+  def __init__(self, key_coder_impl, window_coder_impl, tag_coder_impl):
 self._timestamp_coder_impl = TimestampCoderImpl()
-self._payload_coder_impl = payload_coder_impl
+self._boolean_coder_impl = BooleanCoderImpl()
+self._pane_info_coder_impl = PaneInfoCoderImpl()
+self._key_coder_impl = key_coder_impl
+self._windows_coder_impl = TupleSequenceCoderImpl(window_coder_impl)
+self._tag_coder_impl = tag_coder_impl
 
   def encode_to_stream(self, value, out, nested):
 # type: (dict, create_OutputStream, bool) -> None
-self._timestamp_coder_impl.encode_to_stream(value['timestamp'], out, True)
-self._payload_coder_impl.encode_to_stream(value.get('payload'), out, True)
+self._key_coder_impl.encode_to_stream(value.user_key, out, nested)
+self._tag_coder_impl.encode_to_stream(
+value.dynamic_timer_tag, out, True)
+self._boolean_coder_impl.encode_to_stream(value.clear_bit, out, True)
+if not value.clear_bit:
+  self._timestamp_coder_impl.encode_to_stream(
+  value.fire_timestamp, out, True)
+  self._timestamp_coder_impl.encode_to_stream(
+  value.hold_timestamp, out, True)
+  self._windows_coder_impl.encode_to_stream(value.windows, out, True)
+  self._pane_info_coder_impl.encode_to_stream(value.paneinfo, out, True)
 
   def decode_from_stream(self, in_stream, nested):
 # type: (create_InputStream, bool) -> dict
 # TODO(robertwb): Consider using a concrete class rather than a dict here.
-return dict(
-timestamp=self._timestamp_coder_impl.decode_from_stream(
+from apache_beam.transforms import userstate
+user_key = self._key_coder_impl.decode_from_stream(in_stream, nested)
 
 Review comment:
   ```suggestion
   user_key = self._key_coder_impl.decode_from_stream(in_stream, True)
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411558)
Time Spent: 6.5h  (was: 6h 20m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 6.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411546=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411546
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 28/Mar/20 00:22
Start Date: 28/Mar/20 00:22
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: 
[BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399587568
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 ##
 @@ -197,7 +199,7 @@ public boolean canTranslate(PTransform pTransform) {
 ((KvCoder) mainInput.getCoder()).getKeyCoder(),
 // TODO: Add support for timer payloads to the SDK
 // We currently assume that all payloads are unspecified.
-Timer.Coder.of(VoidCoder.of(;
+Timer.Coder.of(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE)));
 
 Review comment:
   ```suggestion
   Timer.Coder.of(((KvCoder) 
mainInput.getCoder()).getKeyCoder(), GlobalWindow.Coder.INSTANCE)));
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411546)
Time Spent: 5h 50m  (was: 5h 40m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411557=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411557
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 28/Mar/20 00:22
Start Date: 28/Mar/20 00:22
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: 
[BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399589246
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##
 @@ -73,59 +107,81 @@
*/
   public static class Coder extends StructuredCoder> {
 
-public static  Coder of(org.apache.beam.sdk.coders.Coder 
payloadCoder) {
-  return new Coder(payloadCoder);
+public static  Coder of(
+org.apache.beam.sdk.coders.Coder keyCoder,
+org.apache.beam.sdk.coders.Coder windowCoder) 
{
+  return new Coder<>(keyCoder, windowCoder);
 }
 
-private final org.apache.beam.sdk.coders.Coder payloadCoder;
+private final org.apache.beam.sdk.coders.Coder keyCoder;
+private final org.apache.beam.sdk.coders.Coder>
+windowsCoder;
 
-private Coder(org.apache.beam.sdk.coders.Coder payloadCoder) {
-  this.payloadCoder = payloadCoder;
+private Coder(
+org.apache.beam.sdk.coders.Coder keyCoder,
+org.apache.beam.sdk.coders.Coder windowCoder) 
{
+  this.keyCoder = keyCoder;
+  this.windowsCoder = (org.apache.beam.sdk.coders.Coder) 
CollectionCoder.of(windowCoder);
+  ;
 }
 
 @Override
 public void encode(Timer timer, OutputStream outStream) throws 
CoderException, IOException {
-  InstantCoder.of().encode(timer.getTimestamp(), outStream);
-  payloadCoder.encode(timer.getPayload(), outStream);
+  keyCoder.encode(timer.getUserKey(), outStream);
+  StringUtf8Coder.of().encode(timer.getDynamicTimerTag(), outStream);
+  BooleanCoder.of().encode(timer.getClearBit(), outStream);
+  if (!timer.getClearBit()) {
+InstantCoder.of().encode(timer.getFireTimestamp(), outStream);
+InstantCoder.of().encode(timer.getHoldTimestamp(), outStream);
+windowsCoder.encode(timer.getWindows(), outStream);
+PaneInfoCoder.INSTANCE.encode(timer.getPane(), outStream);
+  }
 }
 
 @Override
 public Timer decode(InputStream inStream) throws CoderException, 
IOException {
-  Instant instant = InstantCoder.of().decode(inStream);
-  T value = payloadCoder.decode(inStream);
-  return Timer.of(instant, value);
+  T userKey = keyCoder.decode(inStream);
+  String dynamicTimerTag = StringUtf8Coder.of().decode(inStream);
+  Boolean clearBit = BooleanCoder.of().decode(inStream);
 
 Review comment:
   ```suggestion
 boolean clearBit = BooleanCoder.of().decode(inStream);
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411557)
Time Spent: 6h 20m  (was: 6h 10m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411555=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411555
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 28/Mar/20 00:22
Start Date: 28/Mar/20 00:22
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: 
[BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399591519
 
 

 ##
 File path: sdks/python/apache_beam/coders/coder_impl.py
 ##
 @@ -629,22 +629,56 @@ def estimate_size(self, unused_value, nested=False):
 
 class TimerCoderImpl(StreamCoderImpl):
   """For internal use only; no backwards-compatibility guarantees."""
-  def __init__(self, payload_coder_impl):
+  def __init__(self, key_coder_impl, window_coder_impl, tag_coder_impl):
 self._timestamp_coder_impl = TimestampCoderImpl()
-self._payload_coder_impl = payload_coder_impl
+self._boolean_coder_impl = BooleanCoderImpl()
+self._pane_info_coder_impl = PaneInfoCoderImpl()
+self._key_coder_impl = key_coder_impl
+self._windows_coder_impl = TupleSequenceCoderImpl(window_coder_impl)
+self._tag_coder_impl = tag_coder_impl
 
   def encode_to_stream(self, value, out, nested):
 # type: (dict, create_OutputStream, bool) -> None
-self._timestamp_coder_impl.encode_to_stream(value['timestamp'], out, True)
-self._payload_coder_impl.encode_to_stream(value.get('payload'), out, True)
+self._key_coder_impl.encode_to_stream(value.user_key, out, nested)
+self._tag_coder_impl.encode_to_stream(
+value.dynamic_timer_tag, out, True)
+self._boolean_coder_impl.encode_to_stream(value.clear_bit, out, True)
+if not value.clear_bit:
+  self._timestamp_coder_impl.encode_to_stream(
+  value.fire_timestamp, out, True)
+  self._timestamp_coder_impl.encode_to_stream(
+  value.hold_timestamp, out, True)
+  self._windows_coder_impl.encode_to_stream(value.windows, out, True)
+  self._pane_info_coder_impl.encode_to_stream(value.paneinfo, out, True)
 
 Review comment:
   ```suggestion
 self._pane_info_coder_impl.encode_to_stream(value.paneinfo, out, 
nested)
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411555)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411548=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411548
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 28/Mar/20 00:22
Start Date: 28/Mar/20 00:22
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: 
[BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399588039
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##
 @@ -40,14 +49,26 @@
 @AutoValue
 public abstract class Timer {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer of(Instant time) {
-return of(time, (Void) null);
+  /**
+   * Returns a timer for the given timestamp with a user specified payload.
 
 Review comment:
   ```suggestion
  * Returns a timer for the given timestamp with a user specified key.
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411548)
Time Spent: 6h  (was: 5h 50m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 6h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411552=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411552
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 28/Mar/20 00:22
Start Date: 28/Mar/20 00:22
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: 
[BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399589633
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##
 @@ -73,59 +107,81 @@
*/
   public static class Coder extends StructuredCoder> {
 
-public static  Coder of(org.apache.beam.sdk.coders.Coder 
payloadCoder) {
-  return new Coder(payloadCoder);
+public static  Coder of(
+org.apache.beam.sdk.coders.Coder keyCoder,
+org.apache.beam.sdk.coders.Coder windowCoder) 
{
+  return new Coder<>(keyCoder, windowCoder);
 }
 
-private final org.apache.beam.sdk.coders.Coder payloadCoder;
+private final org.apache.beam.sdk.coders.Coder keyCoder;
+private final org.apache.beam.sdk.coders.Coder>
+windowsCoder;
 
-private Coder(org.apache.beam.sdk.coders.Coder payloadCoder) {
-  this.payloadCoder = payloadCoder;
+private Coder(
+org.apache.beam.sdk.coders.Coder keyCoder,
+org.apache.beam.sdk.coders.Coder windowCoder) 
{
+  this.keyCoder = keyCoder;
+  this.windowsCoder = (org.apache.beam.sdk.coders.Coder) 
CollectionCoder.of(windowCoder);
+  ;
 }
 
 @Override
 public void encode(Timer timer, OutputStream outStream) throws 
CoderException, IOException {
-  InstantCoder.of().encode(timer.getTimestamp(), outStream);
-  payloadCoder.encode(timer.getPayload(), outStream);
+  keyCoder.encode(timer.getUserKey(), outStream);
+  StringUtf8Coder.of().encode(timer.getDynamicTimerTag(), outStream);
+  BooleanCoder.of().encode(timer.getClearBit(), outStream);
+  if (!timer.getClearBit()) {
+InstantCoder.of().encode(timer.getFireTimestamp(), outStream);
+InstantCoder.of().encode(timer.getHoldTimestamp(), outStream);
+windowsCoder.encode(timer.getWindows(), outStream);
+PaneInfoCoder.INSTANCE.encode(timer.getPane(), outStream);
+  }
 }
 
 @Override
 public Timer decode(InputStream inStream) throws CoderException, 
IOException {
-  Instant instant = InstantCoder.of().decode(inStream);
-  T value = payloadCoder.decode(inStream);
-  return Timer.of(instant, value);
+  T userKey = keyCoder.decode(inStream);
+  String dynamicTimerTag = StringUtf8Coder.of().decode(inStream);
+  Boolean clearBit = BooleanCoder.of().decode(inStream);
+  if (clearBit) {
+return Timer.of(userKey, dynamicTimerTag, clearBit);
+  }
+  Instant fireTimestamp = InstantCoder.of().decode(inStream);
+  Instant holeTimestamp = InstantCoder.of().decode(inStream);
+  Collection windows = 
windowsCoder.decode(inStream);
+  PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream);
+  return Timer.of(
+  userKey, dynamicTimerTag, clearBit, fireTimestamp, holeTimestamp, 
windows, pane);
 }
 
 @Override
 public List> 
getCoderArguments() {
-  return Collections.singletonList(payloadCoder);
-}
-
-@Override
-public void verifyDeterministic() throws NonDeterministicException {
-  verifyDeterministic(this, "Payload coder must be deterministic", 
payloadCoder);
+  return Collections.singletonList(keyCoder);
 }
 
 @Override
-public boolean consistentWithEquals() {
-  return payloadCoder.consistentWithEquals();
+public List> getComponents() 
{
+  return Arrays.asList(keyCoder, windowsCoder);
 }
 
 @Override
-public Object structuralValue(Timer value) {
-  return Timer.of(value.getTimestamp(), 
payloadCoder.structuralValue(value.getPayload()));
-}
-
-@Override
-public boolean isRegisterByteSizeObserverCheap(Timer value) {
-  return payloadCoder.isRegisterByteSizeObserverCheap(value.getPayload());
+public void verifyDeterministic() throws NonDeterministicException {
+  verifyDeterministic(this, "UserKey coder must be deterministic", 
keyCoder);
+  verifyDeterministic(this, "Windows coder must be deterministic", 
windowsCoder);
 
 Review comment:
   ```suggestion
 verifyDeterministic(this, "Window coder must be deterministic", 
windowsCoder);
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:

[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411554=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411554
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 28/Mar/20 00:22
Start Date: 28/Mar/20 00:22
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: 
[BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399588782
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##
 @@ -73,59 +107,81 @@
*/
   public static class Coder extends StructuredCoder> {
 
 Review comment:
   please update class comment
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411554)
Time Spent: 6h 10m  (was: 6h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411551=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411551
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 28/Mar/20 00:22
Start Date: 28/Mar/20 00:22
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: 
[BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399591496
 
 

 ##
 File path: sdks/python/apache_beam/coders/coder_impl.py
 ##
 @@ -629,22 +629,56 @@ def estimate_size(self, unused_value, nested=False):
 
 class TimerCoderImpl(StreamCoderImpl):
   """For internal use only; no backwards-compatibility guarantees."""
-  def __init__(self, payload_coder_impl):
+  def __init__(self, key_coder_impl, window_coder_impl, tag_coder_impl):
 self._timestamp_coder_impl = TimestampCoderImpl()
-self._payload_coder_impl = payload_coder_impl
+self._boolean_coder_impl = BooleanCoderImpl()
+self._pane_info_coder_impl = PaneInfoCoderImpl()
+self._key_coder_impl = key_coder_impl
+self._windows_coder_impl = TupleSequenceCoderImpl(window_coder_impl)
+self._tag_coder_impl = tag_coder_impl
 
   def encode_to_stream(self, value, out, nested):
 # type: (dict, create_OutputStream, bool) -> None
-self._timestamp_coder_impl.encode_to_stream(value['timestamp'], out, True)
-self._payload_coder_impl.encode_to_stream(value.get('payload'), out, True)
+self._key_coder_impl.encode_to_stream(value.user_key, out, nested)
 
 Review comment:
   ```suggestion
   self._key_coder_impl.encode_to_stream(value.user_key, out, True)
   ```
   
   We should ignore the nested field since it will change the encoding.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411551)
Time Spent: 6h 10m  (was: 6h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411549=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411549
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 28/Mar/20 00:22
Start Date: 28/Mar/20 00:22
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: 
[BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399588665
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##
 @@ -57,11 +78,24 @@
* org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is 
associated with this
* timer.
*/
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract T getUserKey();
+
+  public abstract String getDynamicTimerTag();
+
+  public abstract Boolean getClearBit();
 
 Review comment:
   ```suggestion
 public abstract boolean getClearBit();
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411549)
Time Spent: 6h  (was: 5h 50m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 6h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411556=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411556
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 28/Mar/20 00:22
Start Date: 28/Mar/20 00:22
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: 
[BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399589315
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##
 @@ -73,59 +107,81 @@
*/
   public static class Coder extends StructuredCoder> {
 
-public static  Coder of(org.apache.beam.sdk.coders.Coder 
payloadCoder) {
-  return new Coder(payloadCoder);
+public static  Coder of(
+org.apache.beam.sdk.coders.Coder keyCoder,
+org.apache.beam.sdk.coders.Coder windowCoder) 
{
+  return new Coder<>(keyCoder, windowCoder);
 }
 
-private final org.apache.beam.sdk.coders.Coder payloadCoder;
+private final org.apache.beam.sdk.coders.Coder keyCoder;
+private final org.apache.beam.sdk.coders.Coder>
+windowsCoder;
 
-private Coder(org.apache.beam.sdk.coders.Coder payloadCoder) {
-  this.payloadCoder = payloadCoder;
+private Coder(
+org.apache.beam.sdk.coders.Coder keyCoder,
+org.apache.beam.sdk.coders.Coder windowCoder) 
{
+  this.keyCoder = keyCoder;
+  this.windowsCoder = (org.apache.beam.sdk.coders.Coder) 
CollectionCoder.of(windowCoder);
+  ;
 }
 
 @Override
 public void encode(Timer timer, OutputStream outStream) throws 
CoderException, IOException {
-  InstantCoder.of().encode(timer.getTimestamp(), outStream);
-  payloadCoder.encode(timer.getPayload(), outStream);
+  keyCoder.encode(timer.getUserKey(), outStream);
+  StringUtf8Coder.of().encode(timer.getDynamicTimerTag(), outStream);
+  BooleanCoder.of().encode(timer.getClearBit(), outStream);
+  if (!timer.getClearBit()) {
+InstantCoder.of().encode(timer.getFireTimestamp(), outStream);
+InstantCoder.of().encode(timer.getHoldTimestamp(), outStream);
+windowsCoder.encode(timer.getWindows(), outStream);
+PaneInfoCoder.INSTANCE.encode(timer.getPane(), outStream);
+  }
 }
 
 @Override
 public Timer decode(InputStream inStream) throws CoderException, 
IOException {
-  Instant instant = InstantCoder.of().decode(inStream);
-  T value = payloadCoder.decode(inStream);
-  return Timer.of(instant, value);
+  T userKey = keyCoder.decode(inStream);
+  String dynamicTimerTag = StringUtf8Coder.of().decode(inStream);
+  Boolean clearBit = BooleanCoder.of().decode(inStream);
+  if (clearBit) {
+return Timer.of(userKey, dynamicTimerTag, clearBit);
+  }
+  Instant fireTimestamp = InstantCoder.of().decode(inStream);
+  Instant holeTimestamp = InstantCoder.of().decode(inStream);
 
 Review comment:
   ```suggestion
 Instant holdTimestamp = InstantCoder.of().decode(inStream);
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411556)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411553=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411553
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 28/Mar/20 00:22
Start Date: 28/Mar/20 00:22
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: 
[BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399588611
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 ##
 @@ -40,14 +49,26 @@
 @AutoValue
 public abstract class Timer {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer of(Instant time) {
-return of(time, (Void) null);
+  /**
+   * Returns a timer for the given timestamp with a user specified payload.
+   *
+   * @return
+   */
+  // TODO(BEAM-9562): Plumb through actual Timer fields.
+  public static  Timer of(
 
 Review comment:
   May I suggest you create these two static methods for creating the two 
common cases?
   .of(userKey, dynamicTimerTag, fireTimestamp, holdTimestamp, windows, pane);
   .cleared(userKey, dynamicTImerTag);
   
   this way you can make sure that dynamicTimerTag, fireTimestamp, 
holdTimestamp, windows and pane are not null
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411553)
Time Spent: 6h 10m  (was: 6h)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9562) Remove timer from PCollection and treat timers as Elements

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9562?focusedWorklogId=411547=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411547
 ]

ASF GitHub Bot logged work on BEAM-9562:


Author: ASF GitHub Bot
Created on: 28/Mar/20 00:22
Start Date: 28/Mar/20 00:22
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #11199: 
[BEAM-9562][WIP] Update Timer encoding with respect of dynamic timers
URL: https://github.com/apache/beam/pull/11199#discussion_r399587616
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 ##
 @@ -197,7 +199,7 @@ public boolean canTranslate(PTransform pTransform) {
 ((KvCoder) mainInput.getCoder()).getKeyCoder(),
 // TODO: Add support for timer payloads to the SDK
 // We currently assume that all payloads are unspecified.
 
 Review comment:
   ```suggestion
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411547)
Time Spent: 5h 50m  (was: 5h 40m)

> Remove timer from PCollection and treat timers as Elements 
> ---
>
> Key: BEAM-9562
> URL: https://issues.apache.org/jira/browse/BEAM-9562
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-harness
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8466) Python typehints: pep 484 warn and strict modes

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8466?focusedWorklogId=411524=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411524
 ]

ASF GitHub Bot logged work on BEAM-8466:


Author: ASF GitHub Bot
Created on: 28/Mar/20 00:00
Start Date: 28/Mar/20 00:00
Worklog Time Spent: 10m 
  Work Description: udim commented on issue #11240: [BEAM-8466] Make 
strip_iterable more strict
URL: https://github.com/apache/beam/pull/11240#issuecomment-605361748
 
 
   I would like to test this internally before merging
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411524)
Time Spent: 1h 40m  (was: 1.5h)

> Python typehints: pep 484 warn and strict modes
> ---
>
> Key: BEAM-8466
> URL: https://issues.apache.org/jira/browse/BEAM-8466
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Allow type checking to use PEP 484 type hints, but only warn if there are 
> errors, and in another mode to raise exceptions on errors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411523=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411523
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:57
Start Date: 27/Mar/20 23:57
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix 
timer window boundary checking
URL: https://github.com/apache/beam/pull/11252#issuecomment-605361234
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411523)
Time Spent: 3h 20m  (was: 3h 10m)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9626) pymongo should be an optional requirement

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9626?focusedWorklogId=411518=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411518
 ]

ASF GitHub Bot logged work on BEAM-9626:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:43
Start Date: 27/Mar/20 23:43
Worklog Time Spent: 10m 
  Work Description: chadrik commented on issue #11256: [BEAM-9626] Make 
pymongo an optional requirement
URL: https://github.com/apache/beam/pull/11256#issuecomment-605358697
 
 
   retest this please
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411518)
Time Spent: 0.5h  (was: 20m)

> pymongo should be an optional requirement
> -
>
> Key: BEAM-9626
> URL: https://issues.apache.org/jira/browse/BEAM-9626
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Reporter: Chad Dombrova
>Assignee: Chad Dombrova
>Priority: Minor
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The pymongo driver is installed by default, but as the number of IO 
> connectors in the python sdk grows, I don't think this is the precedent we 
> want to set.  We already have "extra" packages for gcp, aws, and interactive, 
> we should also add one for mongo. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8292) Add a Reshuffle PTransform preventing fusion of the surrounding transforms

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8292?focusedWorklogId=411508=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411508
 ]

ASF GitHub Bot logged work on BEAM-8292:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:40
Start Date: 27/Mar/20 23:40
Worklog Time Spent: 10m 
  Work Description: lostluck commented on pull request #11197: [BEAM-8292] 
Portable Reshuffle for Go SDK
URL: https://github.com/apache/beam/pull/11197
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411508)
Time Spent: 4h  (was: 3h 50m)

> Add a Reshuffle PTransform preventing fusion of the surrounding transforms
> --
>
> Key: BEAM-8292
> URL: https://issues.apache.org/jira/browse/BEAM-8292
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-go
>Reporter: John Patoch
>Assignee: Robert Burke
>Priority: Minor
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> Reshuffle is a PTransform that takes a PCollection and shuffles the data 
> to help increase parallelism.
> Reshuffle adds a temporary random key to each element, performs a
>  GroupByKey, and finally removes the temporary key.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9577) Update artifact staging and retrieval protocols to be dependency aware.

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9577?focusedWorklogId=411505=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411505
 ]

ASF GitHub Bot logged work on BEAM-9577:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:35
Start Date: 27/Mar/20 23:35
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11203: [BEAM-9577] 
Define and implement dependency-aware artifact staging service.
URL: https://github.com/apache/beam/pull/11203#discussion_r399583281
 
 

 ##
 File path: model/job-management/src/main/proto/beam_artifact_api.proto
 ##
 @@ -31,8 +31,77 @@ option java_outer_classname = "ArtifactApi";
 
 import "beam_runner_api.proto";
 
-// A service to stage artifacts for use in a Job.
+// A service to retrieve artifacts for use in a Job.
+service ArtifactRetrievalService {
+  // Resolves the given artifact reference into one or more simpler artifact
+  // references (e.g. a Maven dependency into a (transitive) set of jars.
+  // If no further simplification is possible, returns the original artifacts,
 
 Review comment:
   Clarified in the docs.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411505)
Time Spent: 3h  (was: 2h 50m)

> Update artifact staging and retrieval protocols to be dependency aware.
> ---
>
> Key: BEAM-9577
> URL: https://issues.apache.org/jira/browse/BEAM-9577
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 3h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9577) Update artifact staging and retrieval protocols to be dependency aware.

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9577?focusedWorklogId=411503=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411503
 ]

ASF GitHub Bot logged work on BEAM-9577:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:35
Start Date: 27/Mar/20 23:35
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11203: [BEAM-9577] 
Define and implement dependency-aware artifact staging service.
URL: https://github.com/apache/beam/pull/11203#discussion_r399581813
 
 

 ##
 File path: model/job-management/src/main/proto/beam_artifact_api.proto
 ##
 @@ -31,8 +31,77 @@ option java_outer_classname = "ArtifactApi";
 
 import "beam_runner_api.proto";
 
-// A service to stage artifacts for use in a Job.
+// A service to retrieve artifacts for use in a Job.
+service ArtifactRetrievalService {
+  // Resolves the given artifact reference into one or more simpler artifact
+  // references (e.g. a Maven dependency into a (transitive) set of jars.
+  // If no further simplification is possible, returns the original artifacts,
+  // at which point, all artifacts must be gettable.
+  rpc ResolveArtifact(ResolveArtifactRequest) returns 
(ResolveArtifactResponse);
+
+  // Retrieves the given artifact as a stream of bytes.
+  rpc GetArtifact(GetArtifactRequest) returns (stream GetArtifactResponse);
+}
+
+// A service that allows the client to act as an ArtifactRetrievalService,
+// for a particular job with the server initiating requests and receiving
+// responses.
+// A client calls the service with an ArtifactResponseWrapper that has the
+// staging token set, and thereafter responds to the server's requests.
 service ArtifactStagingService {
+  rpc ReverseArtifactRetrievalService(stream ArtifactResponseWrapper)
 
 Review comment:
   Good question. As discussed offline, multiple requests can be pushed before 
waiting for a response, so we should be able to fill the network buffer. Either 
the server or client can choose to parallelize on its backend store as needed. 
   
   I added an error field to the resolve_artifacts field as a runner may 
attempt to opportunistically consolidate environments, and then fall back to 
doing each separately. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411503)
Time Spent: 2h 50m  (was: 2h 40m)

> Update artifact staging and retrieval protocols to be dependency aware.
> ---
>
> Key: BEAM-9577
> URL: https://issues.apache.org/jira/browse/BEAM-9577
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9577) Update artifact staging and retrieval protocols to be dependency aware.

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9577?focusedWorklogId=411504=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411504
 ]

ASF GitHub Bot logged work on BEAM-9577:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:35
Start Date: 27/Mar/20 23:35
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11203: [BEAM-9577] 
Define and implement dependency-aware artifact staging service.
URL: https://github.com/apache/beam/pull/11203#discussion_r399582853
 
 

 ##
 File path: model/job-management/src/main/proto/beam_artifact_api.proto
 ##
 @@ -31,8 +31,77 @@ option java_outer_classname = "ArtifactApi";
 
 import "beam_runner_api.proto";
 
-// A service to stage artifacts for use in a Job.
+// A service to retrieve artifacts for use in a Job.
+service ArtifactRetrievalService {
+  // Resolves the given artifact reference into one or more simpler artifact
+  // references (e.g. a Maven dependency into a (transitive) set of jars.
+  // If no further simplification is possible, returns the original artifacts,
+  // at which point, all artifacts must be gettable.
+  rpc ResolveArtifact(ResolveArtifactRequest) returns 
(ResolveArtifactResponse);
+
+  // Retrieves the given artifact as a stream of bytes.
+  rpc GetArtifact(GetArtifactRequest) returns (stream GetArtifactResponse);
+}
+
+// A service that allows the client to act as an ArtifactRetrievalService,
+// for a particular job with the server initiating requests and receiving
+// responses.
+// A client calls the service with an ArtifactResponseWrapper that has the
+// staging token set, and thereafter responds to the server's requests.
 service ArtifactStagingService {
+  rpc ReverseArtifactRetrievalService(stream ArtifactResponseWrapper)
+  returns (stream ArtifactRequestWrapper);
+}
+
+// A request for artifact resolution.
+message ResolveArtifactRequest {
+  repeated org.apache.beam.model.pipeline.v1.ArtifactInformation artifacts = 1;
 
 Review comment:
   The documentation has been expanded. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411504)
Time Spent: 2h 50m  (was: 2h 40m)

> Update artifact staging and retrieval protocols to be dependency aware.
> ---
>
> Key: BEAM-9577
> URL: https://issues.apache.org/jira/browse/BEAM-9577
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9626) pymongo should be an optional requirement

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9626?focusedWorklogId=411493=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411493
 ]

ASF GitHub Bot logged work on BEAM-9626:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:28
Start Date: 27/Mar/20 23:28
Worklog Time Spent: 10m 
  Work Description: chadrik commented on issue #11256: [BEAM-9626] Make 
pymongo an optional requirement
URL: https://github.com/apache/beam/pull/11256#issuecomment-605355110
 
 
   R: @aaltay 
   R: @kamilwu 
   R: @y1chi 
   
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411493)
Time Spent: 20m  (was: 10m)

> pymongo should be an optional requirement
> -
>
> Key: BEAM-9626
> URL: https://issues.apache.org/jira/browse/BEAM-9626
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Reporter: Chad Dombrova
>Assignee: Chad Dombrova
>Priority: Minor
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The pymongo driver is installed by default, but as the number of IO 
> connectors in the python sdk grows, I don't think this is the precedent we 
> want to set.  We already have "extra" packages for gcp, aws, and interactive, 
> we should also add one for mongo. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9626) pymongo should be an optional requirement

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9626?focusedWorklogId=411492=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411492
 ]

ASF GitHub Bot logged work on BEAM-9626:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:26
Start Date: 27/Mar/20 23:26
Worklog Time Spent: 10m 
  Work Description: chadrik commented on pull request #11256: [BEAM-9626] 
Make pymongo an optional requirement
URL: https://github.com/apache/beam/pull/11256
 
 
   The pymongo driver is installed by default, but as the number of IO 
connectors in the python sdk grows, I don't think this is the precedent we want 
to set.  We already have "extra" packages for gcp, aws, and interactive, we 
should also add one for mongo. 
   
   After this PR is merged, the mongo deps can be installed using `pip install 
apache_beam[mongo]`
   
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 

[jira] [Created] (BEAM-9626) pymongo should be an optional requirement

2020-03-27 Thread Chad Dombrova (Jira)
Chad Dombrova created BEAM-9626:
---

 Summary: pymongo should be an optional requirement
 Key: BEAM-9626
 URL: https://issues.apache.org/jira/browse/BEAM-9626
 Project: Beam
  Issue Type: Improvement
  Components: sdk-py-core
Reporter: Chad Dombrova
Assignee: Chad Dombrova


The pymongo driver is installed by default, but as the number of IO connectors 
in the python sdk grows, I don't think this is the precedent we want to set.  
We already have "extra" packages for gcp, aws, and interactive, we should also 
add one for mongo. 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9577) Update artifact staging and retrieval protocols to be dependency aware.

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9577?focusedWorklogId=411491=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411491
 ]

ASF GitHub Bot logged work on BEAM-9577:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:24
Start Date: 27/Mar/20 23:24
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11203: [BEAM-9577] 
Define and implement dependency-aware artifact staging service.
URL: https://github.com/apache/beam/pull/11203#discussion_r399580804
 
 

 ##
 File path: model/job-management/src/main/proto/beam_artifact_api.proto
 ##
 @@ -31,8 +31,77 @@ option java_outer_classname = "ArtifactApi";
 
 import "beam_runner_api.proto";
 
-// A service to stage artifacts for use in a Job.
+// A service to retrieve artifacts for use in a Job.
+service ArtifactRetrievalService {
+  // Resolves the given artifact reference into one or more simpler artifact
+  // references (e.g. a Maven dependency into a (transitive) set of jars.
+  // If no further simplification is possible, returns the original artifacts,
+  // at which point, all artifacts must be gettable.
+  rpc ResolveArtifact(ResolveArtifactRequest) returns 
(ResolveArtifactResponse);
+
+  // Retrieves the given artifact as a stream of bytes.
+  rpc GetArtifact(GetArtifactRequest) returns (stream GetArtifactResponse);
+}
+
+// A service that allows the client to act as an ArtifactRetrievalService,
+// for a particular job with the server initiating requests and receiving
+// responses.
+// A client calls the service with an ArtifactResponseWrapper that has the
+// staging token set, and thereafter responds to the server's requests.
 service ArtifactStagingService {
+  rpc ReverseArtifactRetrievalService(stream ArtifactResponseWrapper)
+  returns (stream ArtifactRequestWrapper);
+}
+
+// A request for artifact resolution.
+message ResolveArtifactRequest {
+  repeated org.apache.beam.model.pipeline.v1.ArtifactInformation artifacts = 1;
 
 Review comment:
   Yes, updated the documentation. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411491)
Time Spent: 2h 40m  (was: 2.5h)

> Update artifact staging and retrieval protocols to be dependency aware.
> ---
>
> Key: BEAM-9577
> URL: https://issues.apache.org/jira/browse/BEAM-9577
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9331) The Row object needs better builders

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9331?focusedWorklogId=411482=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411482
 ]

ASF GitHub Bot logged work on BEAM-9331:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:09
Start Date: 27/Mar/20 23:09
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on pull request #10883: [BEAM-9331] 
Add better Row builders
URL: https://github.com/apache/beam/pull/10883
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411482)
Time Spent: 5h 10m  (was: 5h)

> The Row object needs better builders
> 
>
> Key: BEAM-9331
> URL: https://issues.apache.org/jira/browse/BEAM-9331
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Users should be able to build a Row object by specifying field names. Desired 
> syntax:
>  
> Row.withSchema(schema)
>    .withFieldName("field1", "value)
>   .withFieldName("field2.field3", value)
>   .build()
>  
> Users should also have a builder that allows taking an existing row and 
> changing specific fields.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411470=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411470
 ]

ASF GitHub Bot logged work on BEAM-8019:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:07
Start Date: 27/Mar/20 23:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11185: [BEAM-8019] 
Updates Python SDK to handle remote SDK coders and preserve tags added by 
remote SDKs
URL: https://github.com/apache/beam/pull/11185#discussion_r399571295
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
 ##
 @@ -597,19 +611,47 @@ def _get_side_input_encoding(self, input_encoding):
 },
 }
 
-  def _get_encoded_output_coder(self, transform_node, window_value=True):
-"""Returns the cloud encoding of the coder for the output of a 
transform."""
-from apache_beam.runners.portability.fn_api_runner.translations import \
-  only_element
-if len(transform_node.outputs) == 1:
-  output_tag = only_element(transform_node.outputs.keys())
-  # TODO(robertwb): Handle type hints for multi-output transforms.
+  def _get_encoded_output_coder(
+  self, transform_node, window_value=True, output_tag=None):
+"""Returns the cloud encoding of the coder for the output of a transform.
+
+If output_tag is not specified, we assume all outputs to have the same
+encoding.
+"""
+from apache_beam.transforms.core import RunnerAPIPTransformHolder
+external_transform = isinstance(
+transform_node.transform, RunnerAPIPTransformHolder)
+if external_transform and not output_tag:
+  raise Exception('For external transforms, output_tag must be specified')
+
+if not output_tag:
+  output_tag = next(iter(transform_node.outputs.keys()))
 
 Review comment:
   This is only legal if there is only one output, right? Given the logic, it 
seems it's never actually used. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411470)
Time Spent: 12h  (was: 11h 50m)

> Support cross-language transforms for DataflowRunner
> 
>
> Key: BEAM-8019
> URL: https://issues.apache.org/jira/browse/BEAM-8019
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 12h
>  Remaining Estimate: 0h
>
> This is to capture the Beam changes needed for this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411472=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411472
 ]

ASF GitHub Bot logged work on BEAM-8019:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:07
Start Date: 27/Mar/20 23:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11185: [BEAM-8019] 
Updates Python SDK to handle remote SDK coders and preserve tags added by 
remote SDKs
URL: https://github.com/apache/beam/pull/11185#discussion_r399571052
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
 ##
 @@ -597,19 +611,47 @@ def _get_side_input_encoding(self, input_encoding):
 },
 }
 
-  def _get_encoded_output_coder(self, transform_node, window_value=True):
-"""Returns the cloud encoding of the coder for the output of a 
transform."""
-from apache_beam.runners.portability.fn_api_runner.translations import \
-  only_element
-if len(transform_node.outputs) == 1:
-  output_tag = only_element(transform_node.outputs.keys())
-  # TODO(robertwb): Handle type hints for multi-output transforms.
+  def _get_encoded_output_coder(
+  self, transform_node, window_value=True, output_tag=None):
+"""Returns the cloud encoding of the coder for the output of a transform.
+
+If output_tag is not specified, we assume all outputs to have the same
+encoding.
+"""
+from apache_beam.transforms.core import RunnerAPIPTransformHolder
 
 Review comment:
   Ditto on import. (And any below.)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411472)
Time Spent: 12h 10m  (was: 12h)

> Support cross-language transforms for DataflowRunner
> 
>
> Key: BEAM-8019
> URL: https://issues.apache.org/jira/browse/BEAM-8019
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 12h 10m
>  Remaining Estimate: 0h
>
> This is to capture the Beam changes needed for this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411473=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411473
 ]

ASF GitHub Bot logged work on BEAM-8019:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:07
Start Date: 27/Mar/20 23:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11185: [BEAM-8019] 
Updates Python SDK to handle remote SDK coders and preserve tags added by 
remote SDKs
URL: https://github.com/apache/beam/pull/11185#discussion_r399574133
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
 ##
 @@ -925,8 +970,11 @@ def run_ParDo(self, transform_node, options):
 (transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn or
  use_unified_worker)):
   # Patch side input ids to be unique across a given pipeline.
-  if (label_renames and
-  transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn):
+  # This should not be done for external transforms since external SDKs may
 
 Review comment:
   It seems cleaner to simply not populate the label_renames map in this case 
rather than add a branch here. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411473)
Time Spent: 12h 20m  (was: 12h 10m)

> Support cross-language transforms for DataflowRunner
> 
>
> Key: BEAM-8019
> URL: https://issues.apache.org/jira/browse/BEAM-8019
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 12h 20m
>  Remaining Estimate: 0h
>
> This is to capture the Beam changes needed for this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411471=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411471
 ]

ASF GitHub Bot logged work on BEAM-8019:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:07
Start Date: 27/Mar/20 23:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11185: [BEAM-8019] 
Updates Python SDK to handle remote SDK coders and preserve tags added by 
remote SDKs
URL: https://github.com/apache/beam/pull/11185#discussion_r399570138
 
 

 ##
 File path: sdks/python/apache_beam/pipeline.py
 ##
 @@ -1128,29 +1136,67 @@ def from_runner_api(proto,  # type: 
beam_runner_api_pb2.PTransform
   context  # type: PipelineContext
  ):
 # type: (...) -> AppliedPTransform
-def is_side_input(tag):
+def is_python_side_input(tag):
   # type: (str) -> bool
   # As per named_inputs() above.
-  return tag.startswith('side')
+  return re.match(SIDE_INPUT_REGEX, tag)
+
+side_input_tags = []
+if common_urns.primitives.PAR_DO.urn == proto.spec.urn:
+  # Preserving side input tags.
+  from apache_beam.utils import proto_utils
+  from apache_beam.portability.api import beam_runner_api_pb2
+  payload = (
+  proto_utils.parse_Bytes(
+  proto.spec.payload, beam_runner_api_pb2.ParDoPayload))
+  for tag, si in payload.side_inputs.items():
+side_input_tags.append(tag)
 
 main_inputs = [
 context.pcollections.get_by_id(id) for tag,
-id in proto.inputs.items() if not is_side_input(tag)
+id in proto.inputs.items() if tag not in side_input_tags
 ]
 
-# Ordering is important here.
-indexed_side_inputs = [
-(get_sideinput_index(tag), context.pcollections.get_by_id(id)) for tag,
-id in proto.inputs.items() if is_side_input(tag)
-]
+# Using a list here so that we can pass this into a function
+# TODO: use nonlocal after fully migrated to Python3.
+next_index = [0]
+
+def _get_sideinput_index(tag, next_index):
 
 Review comment:
   It feels like this could result in duplicate indices if side one has tags 
named ['tag', 'side0', ...]. But maybe in that case it's OK? 
   
   Please reference (create?) a JIRA about making side inputs always key-valued 
rather than having this kind of logic (here and elsewhere).
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411471)
Time Spent: 12h  (was: 11h 50m)

> Support cross-language transforms for DataflowRunner
> 
>
> Key: BEAM-8019
> URL: https://issues.apache.org/jira/browse/BEAM-8019
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 12h
>  Remaining Estimate: 0h
>
> This is to capture the Beam changes needed for this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411477=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411477
 ]

ASF GitHub Bot logged work on BEAM-8019:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:07
Start Date: 27/Mar/20 23:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11185: [BEAM-8019] 
Updates Python SDK to handle remote SDK coders and preserve tags added by 
remote SDKs
URL: https://github.com/apache/beam/pull/11185#discussion_r399575610
 
 

 ##
 File path: sdks/python/apache_beam/runners/pipeline_context.py
 ##
 @@ -115,9 +115,12 @@ def get_id_to_proto_map(self):
 # type: () -> Dict[str, message.Message]
 return self._id_to_proto
 
-  def put_proto(self, id, proto):
+  def get_proto_from_id(self, id):
+return self.get_id_to_proto_map()[id]
+
+  def put_proto(self, id, proto, ignore_duplicates=False):
 # type: (str, message.Message) -> str
-if id in self._id_to_proto:
+if not ignore_duplicates and id in self._id_to_proto:
 
 Review comment:
   If ignore_duplicates=True, should we ensure that the protos are the same? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411477)
Time Spent: 12.5h  (was: 12h 20m)

> Support cross-language transforms for DataflowRunner
> 
>
> Key: BEAM-8019
> URL: https://issues.apache.org/jira/browse/BEAM-8019
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 12.5h
>  Remaining Estimate: 0h
>
> This is to capture the Beam changes needed for this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411468=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411468
 ]

ASF GitHub Bot logged work on BEAM-8019:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:07
Start Date: 27/Mar/20 23:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11185: [BEAM-8019] 
Updates Python SDK to handle remote SDK coders and preserve tags added by 
remote SDKs
URL: https://github.com/apache/beam/pull/11185#discussion_r399570500
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
 ##
 @@ -558,6 +564,14 @@ def run_pipeline(self, pipeline, options):
 result.metric_results = self._metrics
 return result
 
+  def _get_encoding_for_external_environment(self, element):
+if not isinstance(element, ElementTypeHolder):
+  raise Exception(
+  'Expected to receive an object of type ElementTypeHolder '
+  'but received %r' % element)
+from apache_beam.coders.coders import ExternalCoder
 
 Review comment:
   Can `apache_beam.coders.coders` not be imported at the top? In which case, 
it seems this whole method can be a one-liner at the call site.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411468)
Time Spent: 11h 50m  (was: 11h 40m)

> Support cross-language transforms for DataflowRunner
> 
>
> Key: BEAM-8019
> URL: https://issues.apache.org/jira/browse/BEAM-8019
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 11h 50m
>  Remaining Estimate: 0h
>
> This is to capture the Beam changes needed for this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411478=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411478
 ]

ASF GitHub Bot logged work on BEAM-8019:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:07
Start Date: 27/Mar/20 23:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11185: [BEAM-8019] 
Updates Python SDK to handle remote SDK coders and preserve tags added by 
remote SDKs
URL: https://github.com/apache/beam/pull/11185#discussion_r399574958
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
 ##
 @@ -995,10 +1044,13 @@ def run_ParDo(self, transform_node, options):
   # The assumption here is that all outputs will have the same typehint
   # and coder as the main output. This is certainly the case right now
   # but conceivably it could change in the future.
+  encoding = self._get_encoded_output_coder(
+  transform_node,
+  output_tag=side_tag) if external_transform else step.encoding
 
 Review comment:
   Why do we need to branch on external_transform her? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411478)
Time Spent: 12.5h  (was: 12h 20m)

> Support cross-language transforms for DataflowRunner
> 
>
> Key: BEAM-8019
> URL: https://issues.apache.org/jira/browse/BEAM-8019
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 12.5h
>  Remaining Estimate: 0h
>
> This is to capture the Beam changes needed for this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411479=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411479
 ]

ASF GitHub Bot logged work on BEAM-8019:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:07
Start Date: 27/Mar/20 23:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11185: [BEAM-8019] 
Updates Python SDK to handle remote SDK coders and preserve tags added by 
remote SDKs
URL: https://github.com/apache/beam/pull/11185#discussion_r399573259
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
 ##
 @@ -597,19 +611,47 @@ def _get_side_input_encoding(self, input_encoding):
 },
 }
 
-  def _get_encoded_output_coder(self, transform_node, window_value=True):
-"""Returns the cloud encoding of the coder for the output of a 
transform."""
-from apache_beam.runners.portability.fn_api_runner.translations import \
-  only_element
-if len(transform_node.outputs) == 1:
-  output_tag = only_element(transform_node.outputs.keys())
-  # TODO(robertwb): Handle type hints for multi-output transforms.
+  def _get_encoded_output_coder(
+  self, transform_node, window_value=True, output_tag=None):
+"""Returns the cloud encoding of the coder for the output of a transform.
+
+If output_tag is not specified, we assume all outputs to have the same
+encoding.
+"""
+from apache_beam.transforms.core import RunnerAPIPTransformHolder
+external_transform = isinstance(
+transform_node.transform, RunnerAPIPTransformHolder)
+if external_transform and not output_tag:
+  raise Exception('For external transforms, output_tag must be specified')
+
+if not output_tag:
+  output_tag = next(iter(transform_node.outputs.keys()))
+
+element_type = None
+if external_transform:
   element_type = transform_node.outputs[output_tag].element_type
+  # We perform special encoding for any external coders that cannot be
+  # parsed in Python SDK.
+  if isinstance(element_type, ElementTypeHolder):
+coder = self._get_encoding_for_external_environment(element_type)
+if window_value:
 
 Review comment:
   Don't duplicate this logic, instead augment `_get_typehint_based_encoding` 
(well, really `coders.registry.get_coder`) to do the natural thing for 
ElementTypeHolders. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411479)
Time Spent: 12.5h  (was: 12h 20m)

> Support cross-language transforms for DataflowRunner
> 
>
> Key: BEAM-8019
> URL: https://issues.apache.org/jira/browse/BEAM-8019
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 12.5h
>  Remaining Estimate: 0h
>
> This is to capture the Beam changes needed for this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411462=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411462
 ]

ASF GitHub Bot logged work on BEAM-8019:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:07
Start Date: 27/Mar/20 23:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11185: [BEAM-8019] 
Updates Python SDK to handle remote SDK coders and preserve tags added by 
remote SDKs
URL: https://github.com/apache/beam/pull/11185#discussion_r399561796
 
 

 ##
 File path: sdks/python/apache_beam/coders/coders.py
 ##
 @@ -370,7 +370,8 @@ def from_runner_api(cls, coder_proto, context):
 except Exception:
   if context.allow_proto_holders:
 # ignore this typing scenario for now, since it can't be easily tracked
-return RunnerAPICoderHolder(coder_proto)  # type: ignore
+return ExternalCoder(
 
 Review comment:
   I'd rather not do this for arbitrary exceptions, instead let's do this iff 
`coder_proto.spec.urn` is not in `cls._known_urns`.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411462)

> Support cross-language transforms for DataflowRunner
> 
>
> Key: BEAM-8019
> URL: https://issues.apache.org/jira/browse/BEAM-8019
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 11h 20m
>  Remaining Estimate: 0h
>
> This is to capture the Beam changes needed for this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411461=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411461
 ]

ASF GitHub Bot logged work on BEAM-8019:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:07
Start Date: 27/Mar/20 23:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11185: [BEAM-8019] 
Updates Python SDK to handle remote SDK coders and preserve tags added by 
remote SDKs
URL: https://github.com/apache/beam/pull/11185#discussion_r399564536
 
 

 ##
 File path: sdks/python/apache_beam/pipeline.py
 ##
 @@ -1128,29 +1136,67 @@ def from_runner_api(proto,  # type: 
beam_runner_api_pb2.PTransform
   context  # type: PipelineContext
  ):
 # type: (...) -> AppliedPTransform
-def is_side_input(tag):
+def is_python_side_input(tag):
   # type: (str) -> bool
   # As per named_inputs() above.
-  return tag.startswith('side')
+  return re.match(SIDE_INPUT_REGEX, tag)
 
 Review comment:
   This'll break if a java user names their side inputs coincidentally 
something that matches. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411461)

> Support cross-language transforms for DataflowRunner
> 
>
> Key: BEAM-8019
> URL: https://issues.apache.org/jira/browse/BEAM-8019
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 11h 20m
>  Remaining Estimate: 0h
>
> This is to capture the Beam changes needed for this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411469=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411469
 ]

ASF GitHub Bot logged work on BEAM-8019:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:07
Start Date: 27/Mar/20 23:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11185: [BEAM-8019] 
Updates Python SDK to handle remote SDK coders and preserve tags added by 
remote SDKs
URL: https://github.com/apache/beam/pull/11185#discussion_r399566029
 
 

 ##
 File path: sdks/python/apache_beam/pipeline.py
 ##
 @@ -1128,29 +1136,67 @@ def from_runner_api(proto,  # type: 
beam_runner_api_pb2.PTransform
   context  # type: PipelineContext
  ):
 # type: (...) -> AppliedPTransform
-def is_side_input(tag):
+def is_python_side_input(tag):
   # type: (str) -> bool
   # As per named_inputs() above.
-  return tag.startswith('side')
+  return re.match(SIDE_INPUT_REGEX, tag)
+
+side_input_tags = []
+if common_urns.primitives.PAR_DO.urn == proto.spec.urn:
+  # Preserving side input tags.
+  from apache_beam.utils import proto_utils
+  from apache_beam.portability.api import beam_runner_api_pb2
+  payload = (
+  proto_utils.parse_Bytes(
+  proto.spec.payload, beam_runner_api_pb2.ParDoPayload))
+  for tag, si in payload.side_inputs.items():
 
 Review comment:
   List comprehension? Or just 
   
   ```
   if ...:
 ...
 side_input_tags = list(payload.side_inputs.keys())
   else:
 side_input_tags = []
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411469)

> Support cross-language transforms for DataflowRunner
> 
>
> Key: BEAM-8019
> URL: https://issues.apache.org/jira/browse/BEAM-8019
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 11h 50m
>  Remaining Estimate: 0h
>
> This is to capture the Beam changes needed for this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411460=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411460
 ]

ASF GitHub Bot logged work on BEAM-8019:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:07
Start Date: 27/Mar/20 23:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11185: [BEAM-8019] 
Updates Python SDK to handle remote SDK coders and preserve tags added by 
remote SDKs
URL: https://github.com/apache/beam/pull/11185#discussion_r399563372
 
 

 ##
 File path: sdks/python/apache_beam/coders/coders.py
 ##
 @@ -1383,22 +1375,74 @@ def from_runner_api_parameter(payload, components, 
context):
 write_state_threshold=int(payload))
 
 
-class RunnerAPICoderHolder(Coder):
+class ElementTypeHolder(typehints.TypeConstraint):
+  """A dummy element type for external coders that cannot be parsed in 
Python"""
+  def __init__(self, coder, context):
+self.coder = coder
+self.context = context
+
+
+class ExternalCoder(Coder):
   """A `Coder` that holds a runner API `Coder` proto.
 
   This is used for coders for which corresponding objects cannot be
   initialized in Python SDK. For example, coders for remote SDKs that may
   be available in Python SDK transform graph when expanding a cross-language
   transform.
   """
-  def __init__(self, proto):
-self._proto = proto
 
-  def proto(self):
-return self._proto
+  coder_count = 0
 
-  def to_runner_api(self, context):
-return self._proto
+  def __init__(self, element_type_holder):
+self.element_type_holder = element_type_holder
 
-  def to_type_hint(self):
-return Any
+  def as_cloud_object(self, coders_context=None):
+if not coders_context:
+  raise Exception(
+  'coders_context must be specified to correctly encode external 
coders'
+  )
+coder_id = coders_context.get_by_proto(
+self.element_type_holder.coder, deduplicate=True)
+
+coder_proto = self.element_type_holder.coder
+
+kind_str = 'kind:external' + str(ExternalCoder.coder_count)
+ExternalCoder.coder_count = ExternalCoder.coder_count + 1
+component_encodings = []
+if coder_proto.spec.urn == 'beam:coder:kv:v1':
 
 Review comment:
   I anticipate `kind:stream` will be needed to handle GBK of unknown types. 
Others may be needed for other cases, or in the future, and it seems risky to 
enumerate them here and in the dataflow runner. There may also be cases where 
we have to go more than one level deep. We should try to return the same thing 
the external SDK would have returned just to be safe, and that means wrapping 
only the leaves as external coders. I think that'll clean stuff up as well 
(e.g. no need for `_coerce_to_kv_type_from_external_type`). 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411460)
Time Spent: 11h 20m  (was: 11h 10m)

> Support cross-language transforms for DataflowRunner
> 
>
> Key: BEAM-8019
> URL: https://issues.apache.org/jira/browse/BEAM-8019
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 11h 20m
>  Remaining Estimate: 0h
>
> This is to capture the Beam changes needed for this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411467=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411467
 ]

ASF GitHub Bot logged work on BEAM-8019:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:07
Start Date: 27/Mar/20 23:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11185: [BEAM-8019] 
Updates Python SDK to handle remote SDK coders and preserve tags added by 
remote SDKs
URL: https://github.com/apache/beam/pull/11185#discussion_r399569155
 
 

 ##
 File path: sdks/python/apache_beam/pipeline.py
 ##
 @@ -1128,29 +1136,67 @@ def from_runner_api(proto,  # type: 
beam_runner_api_pb2.PTransform
   context  # type: PipelineContext
  ):
 # type: (...) -> AppliedPTransform
-def is_side_input(tag):
+def is_python_side_input(tag):
   # type: (str) -> bool
   # As per named_inputs() above.
-  return tag.startswith('side')
+  return re.match(SIDE_INPUT_REGEX, tag)
+
+side_input_tags = []
+if common_urns.primitives.PAR_DO.urn == proto.spec.urn:
+  # Preserving side input tags.
+  from apache_beam.utils import proto_utils
+  from apache_beam.portability.api import beam_runner_api_pb2
+  payload = (
+  proto_utils.parse_Bytes(
+  proto.spec.payload, beam_runner_api_pb2.ParDoPayload))
+  for tag, si in payload.side_inputs.items():
+side_input_tags.append(tag)
 
 main_inputs = [
 context.pcollections.get_by_id(id) for tag,
-id in proto.inputs.items() if not is_side_input(tag)
+id in proto.inputs.items() if tag not in side_input_tags
 ]
 
-# Ordering is important here.
-indexed_side_inputs = [
-(get_sideinput_index(tag), context.pcollections.get_by_id(id)) for tag,
-id in proto.inputs.items() if is_side_input(tag)
-]
+# Using a list here so that we can pass this into a function
+# TODO: use nonlocal after fully migrated to Python3.
+next_index = [0]
+
+def _get_sideinput_index(tag, next_index):
+  if is_python_side_input(tag):
+return get_sideinput_index(tag)
+  else:
+index = next_index[0]
+next_index[0] = next_index[0] + 1
+return index
+
+# Ordering is important here for Python sideinputs.
+indexed_side_inputs = [(
+_get_sideinput_index(tag, next_index),
+context.pcollections.get_by_id(id)) for tag,
+   id in proto.inputs.items() if tag in 
side_input_tags]
 side_inputs = [si for _, si in sorted(indexed_side_inputs)]
+
+input_tags_to_preserve = {}
+
 transform = ptransform.PTransform.from_runner_api(proto, context)
+if isinstance(transform, RunnerAPIPTransformHolder):
+  # For external transforms that are ParDos, we have to set side-inputs
+  # manually and preserve input tags.
+  transform.side_inputs = [pvalue.AsMultiMap(pc) for pc in side_inputs]
 
 Review comment:
   Is AsMultiMap always the right thing to use?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411467)
Time Spent: 11h 50m  (was: 11h 40m)

> Support cross-language transforms for DataflowRunner
> 
>
> Key: BEAM-8019
> URL: https://issues.apache.org/jira/browse/BEAM-8019
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 11h 50m
>  Remaining Estimate: 0h
>
> This is to capture the Beam changes needed for this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411464=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411464
 ]

ASF GitHub Bot logged work on BEAM-8019:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:07
Start Date: 27/Mar/20 23:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11185: [BEAM-8019] 
Updates Python SDK to handle remote SDK coders and preserve tags added by 
remote SDKs
URL: https://github.com/apache/beam/pull/11185#discussion_r399565439
 
 

 ##
 File path: sdks/python/apache_beam/pipeline.py
 ##
 @@ -1128,29 +1136,67 @@ def from_runner_api(proto,  # type: 
beam_runner_api_pb2.PTransform
   context  # type: PipelineContext
  ):
 # type: (...) -> AppliedPTransform
-def is_side_input(tag):
+def is_python_side_input(tag):
   # type: (str) -> bool
   # As per named_inputs() above.
-  return tag.startswith('side')
+  return re.match(SIDE_INPUT_REGEX, tag)
+
+side_input_tags = []
+if common_urns.primitives.PAR_DO.urn == proto.spec.urn:
+  # Preserving side input tags.
+  from apache_beam.utils import proto_utils
 
 Review comment:
   Import at the top unless there's a reason (e.g. circular imports) that 
prevents it. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411464)

> Support cross-language transforms for DataflowRunner
> 
>
> Key: BEAM-8019
> URL: https://issues.apache.org/jira/browse/BEAM-8019
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 11.5h
>  Remaining Estimate: 0h
>
> This is to capture the Beam changes needed for this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411463=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411463
 ]

ASF GitHub Bot logged work on BEAM-8019:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:07
Start Date: 27/Mar/20 23:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11185: [BEAM-8019] 
Updates Python SDK to handle remote SDK coders and preserve tags added by 
remote SDKs
URL: https://github.com/apache/beam/pull/11185#discussion_r399564877
 
 

 ##
 File path: sdks/python/apache_beam/pipeline.py
 ##
 @@ -1110,7 +1117,8 @@ def transform_to_runner_api(transform,  # type: 
Optional[ptransform.PTransform]
 for part in self.parts
 ],
 inputs={
-tag: context.pcollections.get_id(pc)
+_may_be_preserve_tag(tag, pc, self.input_tags_to_preserve):
+context.pcollections.get_id(pc)
 for tag,
 pc in sorted(self.named_inputs().items())
 
 Review comment:
   While we're here, did yapf not like this on the previous line? Maybe write 
`(tag, pc)` (unless it tries to split that too). 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411463)
Time Spent: 11.5h  (was: 11h 20m)

> Support cross-language transforms for DataflowRunner
> 
>
> Key: BEAM-8019
> URL: https://issues.apache.org/jira/browse/BEAM-8019
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 11.5h
>  Remaining Estimate: 0h
>
> This is to capture the Beam changes needed for this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411466=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411466
 ]

ASF GitHub Bot logged work on BEAM-8019:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:07
Start Date: 27/Mar/20 23:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11185: [BEAM-8019] 
Updates Python SDK to handle remote SDK coders and preserve tags added by 
remote SDKs
URL: https://github.com/apache/beam/pull/11185#discussion_r399563919
 
 

 ##
 File path: sdks/python/apache_beam/pipeline.py
 ##
 @@ -1102,6 +1105,10 @@ def transform_to_runner_api(transform,  # type: 
Optional[ptransform.PTransform]
 (transform_urn in Pipeline.sdk_transforms_with_environment())):
   environment_id = context.default_environment_id()
 
+def _may_be_preserve_tag(new_tag, pc, input_tags_to_preserve):
 
 Review comment:
   `maybe` is one word.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411466)

> Support cross-language transforms for DataflowRunner
> 
>
> Key: BEAM-8019
> URL: https://issues.apache.org/jira/browse/BEAM-8019
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 11h 40m
>  Remaining Estimate: 0h
>
> This is to capture the Beam changes needed for this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411480=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411480
 ]

ASF GitHub Bot logged work on BEAM-8019:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:07
Start Date: 27/Mar/20 23:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11185: [BEAM-8019] 
Updates Python SDK to handle remote SDK coders and preserve tags added by 
remote SDKs
URL: https://github.com/apache/beam/pull/11185#discussion_r399571350
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
 ##
 @@ -597,19 +611,47 @@ def _get_side_input_encoding(self, input_encoding):
 },
 }
 
-  def _get_encoded_output_coder(self, transform_node, window_value=True):
-"""Returns the cloud encoding of the coder for the output of a 
transform."""
-from apache_beam.runners.portability.fn_api_runner.translations import \
-  only_element
-if len(transform_node.outputs) == 1:
-  output_tag = only_element(transform_node.outputs.keys())
-  # TODO(robertwb): Handle type hints for multi-output transforms.
+  def _get_encoded_output_coder(
+  self, transform_node, window_value=True, output_tag=None):
+"""Returns the cloud encoding of the coder for the output of a transform.
+
+If output_tag is not specified, we assume all outputs to have the same
+encoding.
+"""
+from apache_beam.transforms.core import RunnerAPIPTransformHolder
+external_transform = isinstance(
+transform_node.transform, RunnerAPIPTransformHolder)
+if external_transform and not output_tag:
+  raise Exception('For external transforms, output_tag must be specified')
 
 Review comment:
   Why?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411480)

> Support cross-language transforms for DataflowRunner
> 
>
> Key: BEAM-8019
> URL: https://issues.apache.org/jira/browse/BEAM-8019
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 12.5h
>  Remaining Estimate: 0h
>
> This is to capture the Beam changes needed for this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411475=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411475
 ]

ASF GitHub Bot logged work on BEAM-8019:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:07
Start Date: 27/Mar/20 23:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11185: [BEAM-8019] 
Updates Python SDK to handle remote SDK coders and preserve tags added by 
remote SDKs
URL: https://github.com/apache/beam/pull/11185#discussion_r399576434
 
 

 ##
 File path: sdks/python/apache_beam/coders/coders.py
 ##
 @@ -1383,22 +1384,69 @@ def from_runner_api_parameter(payload, components, 
context):
 write_state_threshold=int(payload))
 
 
-class RunnerAPICoderHolder(Coder):
+class ElementTypeHolder(typehints.TypeConstraint):
 
 Review comment:
   Should this be an ExternalElementType?
   
   Even better, perhaps this should be a CoderElementType that holds a Coder 
(external or not), and then an ExternalCoder would be a Coder that holds a 
proto. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411475)

> Support cross-language transforms for DataflowRunner
> 
>
> Key: BEAM-8019
> URL: https://issues.apache.org/jira/browse/BEAM-8019
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 12h 20m
>  Remaining Estimate: 0h
>
> This is to capture the Beam changes needed for this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411465=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411465
 ]

ASF GitHub Bot logged work on BEAM-8019:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:07
Start Date: 27/Mar/20 23:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11185: [BEAM-8019] 
Updates Python SDK to handle remote SDK coders and preserve tags added by 
remote SDKs
URL: https://github.com/apache/beam/pull/11185#discussion_r399568963
 
 

 ##
 File path: sdks/python/apache_beam/pipeline.py
 ##
 @@ -1128,29 +1136,67 @@ def from_runner_api(proto,  # type: 
beam_runner_api_pb2.PTransform
   context  # type: PipelineContext
  ):
 # type: (...) -> AppliedPTransform
-def is_side_input(tag):
+def is_python_side_input(tag):
   # type: (str) -> bool
   # As per named_inputs() above.
-  return tag.startswith('side')
+  return re.match(SIDE_INPUT_REGEX, tag)
+
+side_input_tags = []
+if common_urns.primitives.PAR_DO.urn == proto.spec.urn:
+  # Preserving side input tags.
+  from apache_beam.utils import proto_utils
+  from apache_beam.portability.api import beam_runner_api_pb2
+  payload = (
+  proto_utils.parse_Bytes(
+  proto.spec.payload, beam_runner_api_pb2.ParDoPayload))
+  for tag, si in payload.side_inputs.items():
+side_input_tags.append(tag)
 
 main_inputs = [
 context.pcollections.get_by_id(id) for tag,
-id in proto.inputs.items() if not is_side_input(tag)
+id in proto.inputs.items() if tag not in side_input_tags
 ]
 
-# Ordering is important here.
-indexed_side_inputs = [
-(get_sideinput_index(tag), context.pcollections.get_by_id(id)) for tag,
-id in proto.inputs.items() if is_side_input(tag)
-]
+# Using a list here so that we can pass this into a function
+# TODO: use nonlocal after fully migrated to Python3.
+next_index = [0]
+
+def _get_sideinput_index(tag, next_index):
+  if is_python_side_input(tag):
+return get_sideinput_index(tag)
+  else:
+index = next_index[0]
+next_index[0] = next_index[0] + 1
+return index
+
+# Ordering is important here for Python sideinputs.
+indexed_side_inputs = [(
+_get_sideinput_index(tag, next_index),
+context.pcollections.get_by_id(id)) for tag,
+   id in proto.inputs.items() if tag in 
side_input_tags]
 side_inputs = [si for _, si in sorted(indexed_side_inputs)]
+
+input_tags_to_preserve = {}
 
 Review comment:
   https://engdoc.corp.google.com/eng/doc/devguide/py/totw/026.md?cl=head
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411465)
Time Spent: 11h 40m  (was: 11.5h)

> Support cross-language transforms for DataflowRunner
> 
>
> Key: BEAM-8019
> URL: https://issues.apache.org/jira/browse/BEAM-8019
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 11h 40m
>  Remaining Estimate: 0h
>
> This is to capture the Beam changes needed for this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411474=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411474
 ]

ASF GitHub Bot logged work on BEAM-8019:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:07
Start Date: 27/Mar/20 23:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11185: [BEAM-8019] 
Updates Python SDK to handle remote SDK coders and preserve tags added by 
remote SDKs
URL: https://github.com/apache/beam/pull/11185#discussion_r399569834
 
 

 ##
 File path: sdks/python/apache_beam/pipeline.py
 ##
 @@ -1128,29 +1136,67 @@ def from_runner_api(proto,  # type: 
beam_runner_api_pb2.PTransform
   context  # type: PipelineContext
  ):
 # type: (...) -> AppliedPTransform
-def is_side_input(tag):
+def is_python_side_input(tag):
   # type: (str) -> bool
   # As per named_inputs() above.
-  return tag.startswith('side')
+  return re.match(SIDE_INPUT_REGEX, tag)
+
+side_input_tags = []
+if common_urns.primitives.PAR_DO.urn == proto.spec.urn:
+  # Preserving side input tags.
+  from apache_beam.utils import proto_utils
+  from apache_beam.portability.api import beam_runner_api_pb2
+  payload = (
+  proto_utils.parse_Bytes(
+  proto.spec.payload, beam_runner_api_pb2.ParDoPayload))
+  for tag, si in payload.side_inputs.items():
+side_input_tags.append(tag)
 
 main_inputs = [
 context.pcollections.get_by_id(id) for tag,
-id in proto.inputs.items() if not is_side_input(tag)
+id in proto.inputs.items() if tag not in side_input_tags
 ]
 
-# Ordering is important here.
-indexed_side_inputs = [
-(get_sideinput_index(tag), context.pcollections.get_by_id(id)) for tag,
-id in proto.inputs.items() if is_side_input(tag)
-]
+# Using a list here so that we can pass this into a function
+# TODO: use nonlocal after fully migrated to Python3.
+next_index = [0]
+
+def _get_sideinput_index(tag, next_index):
+  if is_python_side_input(tag):
+return get_sideinput_index(tag)
+  else:
+index = next_index[0]
+next_index[0] = next_index[0] + 1
+return index
+
+# Ordering is important here for Python sideinputs.
+indexed_side_inputs = [(
+_get_sideinput_index(tag, next_index),
+context.pcollections.get_by_id(id)) for tag,
+   id in proto.inputs.items() if tag in 
side_input_tags]
 side_inputs = [si for _, si in sorted(indexed_side_inputs)]
+
+input_tags_to_preserve = {}
+
 transform = ptransform.PTransform.from_runner_api(proto, context)
+if isinstance(transform, RunnerAPIPTransformHolder):
 
 Review comment:
   Is the sorting by python side input index needed at all in this path? If 
not, perhaps it should be in an else clause (where get_sideinput_index could be 
used unconditionally)?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411474)

> Support cross-language transforms for DataflowRunner
> 
>
> Key: BEAM-8019
> URL: https://issues.apache.org/jira/browse/BEAM-8019
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 12h 20m
>  Remaining Estimate: 0h
>
> This is to capture the Beam changes needed for this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=411476=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411476
 ]

ASF GitHub Bot logged work on BEAM-8019:


Author: ASF GitHub Bot
Created on: 27/Mar/20 23:07
Start Date: 27/Mar/20 23:07
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11185: [BEAM-8019] 
Updates Python SDK to handle remote SDK coders and preserve tags added by 
remote SDKs
URL: https://github.com/apache/beam/pull/11185#discussion_r399572771
 
 

 ##
 File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
 ##
 @@ -597,19 +611,47 @@ def _get_side_input_encoding(self, input_encoding):
 },
 }
 
-  def _get_encoded_output_coder(self, transform_node, window_value=True):
-"""Returns the cloud encoding of the coder for the output of a 
transform."""
-from apache_beam.runners.portability.fn_api_runner.translations import \
-  only_element
-if len(transform_node.outputs) == 1:
-  output_tag = only_element(transform_node.outputs.keys())
-  # TODO(robertwb): Handle type hints for multi-output transforms.
+  def _get_encoded_output_coder(
+  self, transform_node, window_value=True, output_tag=None):
+"""Returns the cloud encoding of the coder for the output of a transform.
+
+If output_tag is not specified, we assume all outputs to have the same
+encoding.
+"""
+from apache_beam.transforms.core import RunnerAPIPTransformHolder
+external_transform = isinstance(
+transform_node.transform, RunnerAPIPTransformHolder)
+if external_transform and not output_tag:
+  raise Exception('For external transforms, output_tag must be specified')
+
+if not output_tag:
+  output_tag = next(iter(transform_node.outputs.keys()))
+
+element_type = None
+if external_transform:
 
 Review comment:
   Seems the logic should be 
   
   ```
   if output_tag:
 element_type = transform_node.outputs[output_tag].element_type
   elif len(transform_node.outputs) == 1:
 element_type = only_element(transform_node.outputs).element_type
   elif external:
   raise "Tag required for external..."
   else:
 element_type = typehints.Any
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411476)
Time Spent: 12.5h  (was: 12h 20m)

> Support cross-language transforms for DataflowRunner
> 
>
> Key: BEAM-8019
> URL: https://issues.apache.org/jira/browse/BEAM-8019
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 12.5h
>  Remaining Estimate: 0h
>
> This is to capture the Beam changes needed for this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411449=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411449
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:43
Start Date: 27/Mar/20 22:43
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399570655
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -415,24 +412,24 @@ def _run_bundle_multiple_times_for_testing(
 cache_token_generator=cache_token_generator)
 testing_bundle_manager.process_bundle(data_input, data_output)
   finally:
-worker_handler.state.restore()
+
runner_execution_context.worker_handler_manager.state_servicer.restore()
 
   def _collect_written_timers_and_add_to_deferred_inputs(
   self,
-  pipeline_components,  # type: beam_runner_api_pb2.Components
-  stage,  # type: translations.Stage
+  runner_execution_context,  # type: execution.FnApiRunnerExecutionContext
   bundle_context_manager,  # type: execution.BundleContextManager
   deferred_inputs,  # type: MutableMapping[str, PartitionableBuffer]
-  data_channel_coders,  # type: Mapping[str, str]
   ):
 # type: (...) -> None
 
-for transform_id, timer_writes in stage.timer_pcollections:
+for transform_id, timer_writes in \
 
 Review comment:
   Done
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411449)
Time Spent: 2h 20m  (was: 2h 10m)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411448=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411448
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:43
Start Date: 27/Mar/20 22:43
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399570601
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -369,8 +366,8 @@ def _store_side_inputs_in_state(self,
   state_key = beam_fn_api_pb2.StateKey(
   iterable_side_input=beam_fn_api_pb2.StateKey.IterableSideInput(
   transform_id=transform_id, side_input_id=tag, window=window))
-  bundle_context_manager.worker_handler.state.append_raw(
-  state_key, elements_data)
+  runner_execution_context.worker_handler_manager.state_servicer\
 
 Review comment:
   I've just added the state_sevicer() from your first comment, but I've 
created a jira issue to track fixng that later on.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411448)
Time Spent: 2h 10m  (was: 2h)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411445=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411445
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:42
Start Date: 27/Mar/20 22:42
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399570217
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object):
``beam.PCollection``.
  """
   def __init__(self,
-  worker_handler_factory,  # type: Callable[[Optional[str], int], 
List[WorkerHandler]]
+  worker_handler_manager,  # type: worker_handlers.WorkerHandlerManager
   pipeline_components,  # type: beam_runner_api_pb2.Components
   safe_coders,
   data_channel_coders,
):
 """
-:param worker_handler_factory: A ``callable`` that takes in an environment
+:param worker_handler_manager: A ``callable`` that takes in an environment
 id and a number of workers, and returns a list of ``WorkerHandler``s.
 :param pipeline_components:  (beam_runner_api_pb2.Components): TODO
 :param safe_coders:
 :param data_channel_coders:
 """
 self.pcoll_buffers = {}  # type: MutableMapping[bytes, PartitionableBuffer]
-self.worker_handler_factory = worker_handler_factory
+self.worker_handler_manager = worker_handler_manager
 self.pipeline_components = pipeline_components
 self.safe_coders = safe_coders
 self.data_channel_coders = data_channel_coders
 
+self.pipeline_context = pipeline_context.PipelineContext(
+self.pipeline_components,
+iterable_state_write=self._iterable_state_write)
+self._last_uid = -1
+
+  def next_uid(self):
+self._last_uid += 1
+return str(self._last_uid)
+
+  def _iterable_state_write(self, values, element_coder_impl):
+# type: (...) -> bytes
+token = unique_name(None, 'iter').encode('ascii')
+out = create_OutputStream()
+for element in values:
+  element_coder_impl.encode_to_stream(element, out, True)
+self.worker_handler_manager.state_servicer.append_raw(
+beam_fn_api_pb2.StateKey(
+runner=beam_fn_api_pb2.StateKey.Runner(key=token)),
+out.get())
+return token
+
 
 class BundleContextManager(object):
 
   def __init__(self,
-  execution_context, # type: FnApiRunnerExecutionContext
-  process_bundle_descriptor,  # type: 
beam_fn_api_pb2.ProcessBundleDescriptor
-  worker_handler,  # type: fn_runner.WorkerHandler
-  p_context,  # type: pipeline_context.PipelineContext
-   ):
+   execution_context, # type: FnApiRunnerExecutionContext
+   stage,  # type: translations.Stage
+   num_workers,  # type: int
+  ):
 self.execution_context = execution_context
-self.process_bundle_descriptor = process_bundle_descriptor
-self.worker_handler = worker_handler
-self.pipeline_context = p_context
+self.stage = stage
+self.bundle_uid = self.execution_context.next_uid()
+self.num_workers = num_workers
+
+# Properties that are lazily initialized
+self._process_bundle_descriptor = None
+self._worker_handlers = None
+
+  @property
+  def worker_handlers(self):
+if self._worker_handlers is None:
+  self._worker_handlers = self.execution_context.worker_handler_manager\
 
 Review comment:
   Done.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411445)
Time Spent: 1h 40m  (was: 1.5h)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411444=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411444
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:42
Start Date: 27/Mar/20 22:42
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399570204
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object):
``beam.PCollection``.
  """
   def __init__(self,
-  worker_handler_factory,  # type: Callable[[Optional[str], int], 
List[WorkerHandler]]
+  worker_handler_manager,  # type: worker_handlers.WorkerHandlerManager
   pipeline_components,  # type: beam_runner_api_pb2.Components
   safe_coders,
   data_channel_coders,
):
 """
-:param worker_handler_factory: A ``callable`` that takes in an environment
+:param worker_handler_manager: A ``callable`` that takes in an environment
 
 Review comment:
   Done.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411444)
Time Spent: 1.5h  (was: 1h 20m)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411447=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411447
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:42
Start Date: 27/Mar/20 22:42
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399570251
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object):
``beam.PCollection``.
  """
   def __init__(self,
-  worker_handler_factory,  # type: Callable[[Optional[str], int], 
List[WorkerHandler]]
+  worker_handler_manager,  # type: worker_handlers.WorkerHandlerManager
   pipeline_components,  # type: beam_runner_api_pb2.Components
   safe_coders,
   data_channel_coders,
):
 """
-:param worker_handler_factory: A ``callable`` that takes in an environment
+:param worker_handler_manager: A ``callable`` that takes in an environment
 id and a number of workers, and returns a list of ``WorkerHandler``s.
 :param pipeline_components:  (beam_runner_api_pb2.Components): TODO
 :param safe_coders:
 :param data_channel_coders:
 """
 self.pcoll_buffers = {}  # type: MutableMapping[bytes, PartitionableBuffer]
-self.worker_handler_factory = worker_handler_factory
+self.worker_handler_manager = worker_handler_manager
 self.pipeline_components = pipeline_components
 self.safe_coders = safe_coders
 self.data_channel_coders = data_channel_coders
 
+self.pipeline_context = pipeline_context.PipelineContext(
+self.pipeline_components,
+iterable_state_write=self._iterable_state_write)
+self._last_uid = -1
+
+  def next_uid(self):
+self._last_uid += 1
+return str(self._last_uid)
+
+  def _iterable_state_write(self, values, element_coder_impl):
+# type: (...) -> bytes
+token = unique_name(None, 'iter').encode('ascii')
+out = create_OutputStream()
+for element in values:
+  element_coder_impl.encode_to_stream(element, out, True)
+self.worker_handler_manager.state_servicer.append_raw(
+beam_fn_api_pb2.StateKey(
+runner=beam_fn_api_pb2.StateKey.Runner(key=token)),
+out.get())
+return token
+
 
 class BundleContextManager(object):
 
   def __init__(self,
-  execution_context, # type: FnApiRunnerExecutionContext
-  process_bundle_descriptor,  # type: 
beam_fn_api_pb2.ProcessBundleDescriptor
-  worker_handler,  # type: fn_runner.WorkerHandler
-  p_context,  # type: pipeline_context.PipelineContext
-   ):
+   execution_context, # type: FnApiRunnerExecutionContext
+   stage,  # type: translations.Stage
+   num_workers,  # type: int
+  ):
 self.execution_context = execution_context
-self.process_bundle_descriptor = process_bundle_descriptor
-self.worker_handler = worker_handler
-self.pipeline_context = p_context
+self.stage = stage
+self.bundle_uid = self.execution_context.next_uid()
+self.num_workers = num_workers
+
+# Properties that are lazily initialized
+self._process_bundle_descriptor = None
+self._worker_handlers = None
+
+  @property
+  def worker_handlers(self):
+if self._worker_handlers is None:
+  self._worker_handlers = self.execution_context.worker_handler_manager\
+.get_worker_handlers(self.stage.environment, self.num_workers)
+return self._worker_handlers
+
+  def data_api_service_descriptor(self):
+# All worker_handlers share the same grpc server, so we can read grpc 
server
+# info from any worker_handler and read from the first worker_handler.
+return next(iter(self.worker_handlers)).data_api_service_descriptor()
+
+  def state_api_service_descriptor(self):
+# All worker_handlers share the same grpc server, so we can read grpc 
server
+# info from any worker_handler and read from the first worker_handler.
+return next(iter(self.worker_handlers)).state_api_service_descriptor()
+
+  @property
+  def process_bundle_descriptor(self):
+if self._process_bundle_descriptor is None:
+  self._process_bundle_descriptor = self._build_process_bundle_descriptor()
+return self._process_bundle_descriptor
+
+  def _build_process_bundle_descriptor(self):
+res = beam_fn_api_pb2.ProcessBundleDescriptor(
+id=self.bundle_uid,
+transforms={
+transform.unique_name: transform
+for transform in self.stage.transforms
+},
+pcollections=dict(
+self.execution_context.pipeline_components.pcollections.items()),
+

[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411446=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411446
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:42
Start Date: 27/Mar/20 22:42
Worklog Time Spent: 10m 
  Work Description: pabloem commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399570237
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object):
``beam.PCollection``.
  """
   def __init__(self,
-  worker_handler_factory,  # type: Callable[[Optional[str], int], 
List[WorkerHandler]]
+  worker_handler_manager,  # type: worker_handlers.WorkerHandlerManager
   pipeline_components,  # type: beam_runner_api_pb2.Components
   safe_coders,
   data_channel_coders,
):
 """
-:param worker_handler_factory: A ``callable`` that takes in an environment
+:param worker_handler_manager: A ``callable`` that takes in an environment
 id and a number of workers, and returns a list of ``WorkerHandler``s.
 :param pipeline_components:  (beam_runner_api_pb2.Components): TODO
 :param safe_coders:
 :param data_channel_coders:
 """
 self.pcoll_buffers = {}  # type: MutableMapping[bytes, PartitionableBuffer]
-self.worker_handler_factory = worker_handler_factory
+self.worker_handler_manager = worker_handler_manager
 self.pipeline_components = pipeline_components
 self.safe_coders = safe_coders
 self.data_channel_coders = data_channel_coders
 
+self.pipeline_context = pipeline_context.PipelineContext(
+self.pipeline_components,
+iterable_state_write=self._iterable_state_write)
+self._last_uid = -1
+
+  def next_uid(self):
+self._last_uid += 1
+return str(self._last_uid)
+
+  def _iterable_state_write(self, values, element_coder_impl):
+# type: (...) -> bytes
+token = unique_name(None, 'iter').encode('ascii')
+out = create_OutputStream()
+for element in values:
+  element_coder_impl.encode_to_stream(element, out, True)
+self.worker_handler_manager.state_servicer.append_raw(
+beam_fn_api_pb2.StateKey(
+runner=beam_fn_api_pb2.StateKey.Runner(key=token)),
+out.get())
+return token
+
 
 class BundleContextManager(object):
 
   def __init__(self,
-  execution_context, # type: FnApiRunnerExecutionContext
-  process_bundle_descriptor,  # type: 
beam_fn_api_pb2.ProcessBundleDescriptor
-  worker_handler,  # type: fn_runner.WorkerHandler
-  p_context,  # type: pipeline_context.PipelineContext
-   ):
+   execution_context, # type: FnApiRunnerExecutionContext
+   stage,  # type: translations.Stage
+   num_workers,  # type: int
+  ):
 self.execution_context = execution_context
-self.process_bundle_descriptor = process_bundle_descriptor
-self.worker_handler = worker_handler
-self.pipeline_context = p_context
+self.stage = stage
+self.bundle_uid = self.execution_context.next_uid()
+self.num_workers = num_workers
+
+# Properties that are lazily initialized
+self._process_bundle_descriptor = None
+self._worker_handlers = None
+
+  @property
+  def worker_handlers(self):
+if self._worker_handlers is None:
+  self._worker_handlers = self.execution_context.worker_handler_manager\
+.get_worker_handlers(self.stage.environment, self.num_workers)
+return self._worker_handlers
+
+  def data_api_service_descriptor(self):
+# All worker_handlers share the same grpc server, so we can read grpc 
server
+# info from any worker_handler and read from the first worker_handler.
+return next(iter(self.worker_handlers)).data_api_service_descriptor()
 
 Review comment:
   Done.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411446)
Time Spent: 1h 50m  (was: 1h 40m)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada

[jira] [Updated] (BEAM-9625) StateServicer should be owned by FnApiRunnerContextManager

2020-03-27 Thread Pablo Estrada (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pablo Estrada updated BEAM-9625:

Status: Open  (was: Triage Needed)

> StateServicer should be owned by FnApiRunnerContextManager
> --
>
> Key: BEAM-9625
> URL: https://issues.apache.org/jira/browse/BEAM-9625
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-9625) StateServicer should be owned by FnApiRunnerContextManager

2020-03-27 Thread Pablo Estrada (Jira)
Pablo Estrada created BEAM-9625:
---

 Summary: StateServicer should be owned by FnApiRunnerContextManager
 Key: BEAM-9625
 URL: https://issues.apache.org/jira/browse/BEAM-9625
 Project: Beam
  Issue Type: Sub-task
  Components: sdk-py-core
Reporter: Pablo Estrada
Assignee: Pablo Estrada






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-9624) Combine operation should support only converting to accumulators

2020-03-27 Thread Andrew Crites (Jira)
Andrew Crites created BEAM-9624:
---

 Summary: Combine operation should support only converting to 
accumulators
 Key: BEAM-9624
 URL: https://issues.apache.org/jira/browse/BEAM-9624
 Project: Beam
  Issue Type: Improvement
  Components: runner-core
Reporter: Andrew Crites
Assignee: Andrew Crites


For streaming pipelines, we want to be able to lift the combiner into the 
MergeBuckets without having to also do a PartialGroupByKey before the shuffle. 
We don't want to do the PGBK since it could cause non-deterministic results 
when used with some triggers.

We propose adding a new URN for doing just the convert to accumulators step and 
adding support for it in Java/Python/Go.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9454) Add Deduplication transform for SDF

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9454?focusedWorklogId=411426=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411426
 ]

ASF GitHub Bot logged work on BEAM-9454:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:06
Start Date: 27/Mar/20 22:06
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11060: [BEAM-9454] 
Create Deduplication transform based on user timer/state
URL: https://github.com/apache/beam/pull/11060#discussion_r399559906
 
 

 ##
 File path: sdks/python/apache_beam/transforms/deduplicate.py
 ##
 @@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+"""a collection of ptransforms for deduplicating elements."""
+
+from __future__ import absolute_import
+from __future__ import division
+
+import typing
+
+from apache_beam import typehints
+from apache_beam.coders.coders import BooleanCoder
+from apache_beam.transforms import core
+from apache_beam.transforms import ptransform
+from apache_beam.transforms import userstate
+from apache_beam.transforms.timeutil import TimeDomain
+from apache_beam.utils import timestamp
+
+__all__ = [
+'Deduplicate',
+'DeduplicatePerKey',
+]
+
+K = typing.TypeVar('K')
+V = typing.TypeVar('V')
+
+
+@typehints.with_input_types(typing.Tuple[K, V])
+@typehints.with_output_types(typing.Tuple[K, V])
+class DeduplicatePerKey(ptransform.PTransform):
+  """ A PTransform which deduplicates  pair over a time domain and
+  threshold. Values in different windows will NOT be considered duplicates of
+  each other. Deduplication is best effort.
+
+  The durations specified may impose memory and/or storage requirements within
+  a runner and care might need to be used to ensure that the deduplication time
+  limit is long enough to remove duplicates but short enough to not cause
+  performance problems within a runner. Each runner may provide an optimized
+  implementation of their choice using the deduplication time domain and
+  threshold specified.
+
+  Does not preserve any order the input PCollection might have had.
+  """
+  def __init__(self, processing_time_duration=None, event_time_duration=None):
+if processing_time_duration is None and event_time_duration is None:
+  raise ValueError(
+  'DeduplicatePerKey requires at lease provide either'
+  'processing_time_duration or event_time_duration.')
+self.processing_time_duration = processing_time_duration
+self.event_time_duration = event_time_duration
+
+  def _create_deduplicate_fn(self):
+processing_timer_spec = userstate.TimerSpec(
+'processing_timer', TimeDomain.REAL_TIME)
+event_timer_spec = userstate.TimerSpec('event_timer', TimeDomain.WATERMARK)
+state_spec = userstate.BagStateSpec('seen', BooleanCoder())
 
 Review comment:
   To follow up from what we discussed off-line, I think combining state 
semantically makes slightly more sense here, but as we're not adding to the 
state unless it's the first element there's no performance concerns, and I'm 
also fine with this as is. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411426)
Time Spent: 5h 40m  (was: 5.5h)

> Add Deduplication transform for SDF
> ---
>
> Key: BEAM-9454
> URL: https://issues.apache.org/jira/browse/BEAM-9454
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-core, sdk-py-core
>Reporter: Boyuan Zhang
>Priority: Major
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> When SDF is used as a source-like operation, it's necessary to provide a 
> default Deduplication transform for the SDF user to deduplicate values by 
> certain unique id.



--
This message was 

[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411419=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411419
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:03
Start Date: 27/Mar/20 22:03
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399555388
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object):
``beam.PCollection``.
  """
   def __init__(self,
-  worker_handler_factory,  # type: Callable[[Optional[str], int], 
List[WorkerHandler]]
+  worker_handler_manager,  # type: worker_handlers.WorkerHandlerManager
   pipeline_components,  # type: beam_runner_api_pb2.Components
   safe_coders,
   data_channel_coders,
):
 """
-:param worker_handler_factory: A ``callable`` that takes in an environment
+:param worker_handler_manager: A ``callable`` that takes in an environment
 
 Review comment:
   Fix description. 
   
   (Interestingly, I just did this change as well for another PR.)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411419)
Time Spent: 1h  (was: 50m)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411424=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411424
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:03
Start Date: 27/Mar/20 22:03
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399556279
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object):
``beam.PCollection``.
  """
   def __init__(self,
-  worker_handler_factory,  # type: Callable[[Optional[str], int], 
List[WorkerHandler]]
+  worker_handler_manager,  # type: worker_handlers.WorkerHandlerManager
   pipeline_components,  # type: beam_runner_api_pb2.Components
   safe_coders,
   data_channel_coders,
):
 """
-:param worker_handler_factory: A ``callable`` that takes in an environment
+:param worker_handler_manager: A ``callable`` that takes in an environment
 id and a number of workers, and returns a list of ``WorkerHandler``s.
 :param pipeline_components:  (beam_runner_api_pb2.Components): TODO
 :param safe_coders:
 :param data_channel_coders:
 """
 self.pcoll_buffers = {}  # type: MutableMapping[bytes, PartitionableBuffer]
-self.worker_handler_factory = worker_handler_factory
+self.worker_handler_manager = worker_handler_manager
 self.pipeline_components = pipeline_components
 self.safe_coders = safe_coders
 self.data_channel_coders = data_channel_coders
 
+self.pipeline_context = pipeline_context.PipelineContext(
+self.pipeline_components,
+iterable_state_write=self._iterable_state_write)
+self._last_uid = -1
+
+  def next_uid(self):
+self._last_uid += 1
+return str(self._last_uid)
+
+  def _iterable_state_write(self, values, element_coder_impl):
+# type: (...) -> bytes
+token = unique_name(None, 'iter').encode('ascii')
+out = create_OutputStream()
+for element in values:
+  element_coder_impl.encode_to_stream(element, out, True)
+self.worker_handler_manager.state_servicer.append_raw(
+beam_fn_api_pb2.StateKey(
+runner=beam_fn_api_pb2.StateKey.Runner(key=token)),
+out.get())
+return token
+
 
 class BundleContextManager(object):
 
   def __init__(self,
-  execution_context, # type: FnApiRunnerExecutionContext
-  process_bundle_descriptor,  # type: 
beam_fn_api_pb2.ProcessBundleDescriptor
-  worker_handler,  # type: fn_runner.WorkerHandler
-  p_context,  # type: pipeline_context.PipelineContext
-   ):
+   execution_context, # type: FnApiRunnerExecutionContext
+   stage,  # type: translations.Stage
+   num_workers,  # type: int
+  ):
 self.execution_context = execution_context
-self.process_bundle_descriptor = process_bundle_descriptor
-self.worker_handler = worker_handler
-self.pipeline_context = p_context
+self.stage = stage
+self.bundle_uid = self.execution_context.next_uid()
+self.num_workers = num_workers
+
+# Properties that are lazily initialized
+self._process_bundle_descriptor = None
+self._worker_handlers = None
+
+  @property
+  def worker_handlers(self):
+if self._worker_handlers is None:
+  self._worker_handlers = self.execution_context.worker_handler_manager\
 
 Review comment:
   Prefer ()'s to backslashes for line breaks.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411424)
Time Spent: 1h 20m  (was: 1h 10m)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411421=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411421
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:03
Start Date: 27/Mar/20 22:03
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399556846
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object):
``beam.PCollection``.
  """
   def __init__(self,
-  worker_handler_factory,  # type: Callable[[Optional[str], int], 
List[WorkerHandler]]
+  worker_handler_manager,  # type: worker_handlers.WorkerHandlerManager
   pipeline_components,  # type: beam_runner_api_pb2.Components
   safe_coders,
   data_channel_coders,
):
 """
-:param worker_handler_factory: A ``callable`` that takes in an environment
+:param worker_handler_manager: A ``callable`` that takes in an environment
 id and a number of workers, and returns a list of ``WorkerHandler``s.
 :param pipeline_components:  (beam_runner_api_pb2.Components): TODO
 :param safe_coders:
 :param data_channel_coders:
 """
 self.pcoll_buffers = {}  # type: MutableMapping[bytes, PartitionableBuffer]
-self.worker_handler_factory = worker_handler_factory
+self.worker_handler_manager = worker_handler_manager
 self.pipeline_components = pipeline_components
 self.safe_coders = safe_coders
 self.data_channel_coders = data_channel_coders
 
+self.pipeline_context = pipeline_context.PipelineContext(
+self.pipeline_components,
+iterable_state_write=self._iterable_state_write)
+self._last_uid = -1
+
+  def next_uid(self):
+self._last_uid += 1
+return str(self._last_uid)
+
+  def _iterable_state_write(self, values, element_coder_impl):
+# type: (...) -> bytes
+token = unique_name(None, 'iter').encode('ascii')
+out = create_OutputStream()
+for element in values:
+  element_coder_impl.encode_to_stream(element, out, True)
+self.worker_handler_manager.state_servicer.append_raw(
+beam_fn_api_pb2.StateKey(
+runner=beam_fn_api_pb2.StateKey.Runner(key=token)),
+out.get())
+return token
+
 
 class BundleContextManager(object):
 
   def __init__(self,
-  execution_context, # type: FnApiRunnerExecutionContext
-  process_bundle_descriptor,  # type: 
beam_fn_api_pb2.ProcessBundleDescriptor
-  worker_handler,  # type: fn_runner.WorkerHandler
-  p_context,  # type: pipeline_context.PipelineContext
-   ):
+   execution_context, # type: FnApiRunnerExecutionContext
+   stage,  # type: translations.Stage
+   num_workers,  # type: int
+  ):
 self.execution_context = execution_context
-self.process_bundle_descriptor = process_bundle_descriptor
-self.worker_handler = worker_handler
-self.pipeline_context = p_context
+self.stage = stage
+self.bundle_uid = self.execution_context.next_uid()
+self.num_workers = num_workers
+
+# Properties that are lazily initialized
+self._process_bundle_descriptor = None
+self._worker_handlers = None
+
+  @property
+  def worker_handlers(self):
+if self._worker_handlers is None:
+  self._worker_handlers = self.execution_context.worker_handler_manager\
+.get_worker_handlers(self.stage.environment, self.num_workers)
+return self._worker_handlers
+
+  def data_api_service_descriptor(self):
+# All worker_handlers share the same grpc server, so we can read grpc 
server
+# info from any worker_handler and read from the first worker_handler.
+return next(iter(self.worker_handlers)).data_api_service_descriptor()
 
 Review comment:
   I don't know how critical this is for performance (mostly for tests), but 
`self.worker_handlers[0]` might be preferable. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411421)
Time Spent: 1h 10m  (was: 1h)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: 

[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411422=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411422
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:03
Start Date: 27/Mar/20 22:03
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399557974
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -369,8 +366,8 @@ def _store_side_inputs_in_state(self,
   state_key = beam_fn_api_pb2.StateKey(
   iterable_side_input=beam_fn_api_pb2.StateKey.IterableSideInput(
   transform_id=transform_id, side_input_id=tag, window=window))
-  bundle_context_manager.worker_handler.state.append_raw(
-  state_key, elements_data)
+  runner_execution_context.worker_handler_manager.state_servicer\
 
 Review comment:
   Perhaps add a state_servicer() method right on runner_execution_context (and 
use several places below)?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411422)
Time Spent: 1h 20m  (was: 1h 10m)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411425=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411425
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:03
Start Date: 27/Mar/20 22:03
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399558954
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -369,8 +366,8 @@ def _store_side_inputs_in_state(self,
   state_key = beam_fn_api_pb2.StateKey(
   iterable_side_input=beam_fn_api_pb2.StateKey.IterableSideInput(
   transform_id=transform_id, side_input_id=tag, window=window))
-  bundle_context_manager.worker_handler.state.append_raw(
-  state_key, elements_data)
+  runner_execution_context.worker_handler_manager.state_servicer\
 
 Review comment:
   On second thought, perhaps the ownership of state servicer should be moved 
up to runner_execution_context (though the worker manager may need a 
reference). Your call. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411425)
Time Spent: 1h 20m  (was: 1h 10m)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411423=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411423
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:03
Start Date: 27/Mar/20 22:03
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399557293
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##
 @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object):
``beam.PCollection``.
  """
   def __init__(self,
-  worker_handler_factory,  # type: Callable[[Optional[str], int], 
List[WorkerHandler]]
+  worker_handler_manager,  # type: worker_handlers.WorkerHandlerManager
   pipeline_components,  # type: beam_runner_api_pb2.Components
   safe_coders,
   data_channel_coders,
):
 """
-:param worker_handler_factory: A ``callable`` that takes in an environment
+:param worker_handler_manager: A ``callable`` that takes in an environment
 id and a number of workers, and returns a list of ``WorkerHandler``s.
 :param pipeline_components:  (beam_runner_api_pb2.Components): TODO
 :param safe_coders:
 :param data_channel_coders:
 """
 self.pcoll_buffers = {}  # type: MutableMapping[bytes, PartitionableBuffer]
-self.worker_handler_factory = worker_handler_factory
+self.worker_handler_manager = worker_handler_manager
 self.pipeline_components = pipeline_components
 self.safe_coders = safe_coders
 self.data_channel_coders = data_channel_coders
 
+self.pipeline_context = pipeline_context.PipelineContext(
+self.pipeline_components,
+iterable_state_write=self._iterable_state_write)
+self._last_uid = -1
+
+  def next_uid(self):
+self._last_uid += 1
+return str(self._last_uid)
+
+  def _iterable_state_write(self, values, element_coder_impl):
+# type: (...) -> bytes
+token = unique_name(None, 'iter').encode('ascii')
+out = create_OutputStream()
+for element in values:
+  element_coder_impl.encode_to_stream(element, out, True)
+self.worker_handler_manager.state_servicer.append_raw(
+beam_fn_api_pb2.StateKey(
+runner=beam_fn_api_pb2.StateKey.Runner(key=token)),
+out.get())
+return token
+
 
 class BundleContextManager(object):
 
   def __init__(self,
-  execution_context, # type: FnApiRunnerExecutionContext
-  process_bundle_descriptor,  # type: 
beam_fn_api_pb2.ProcessBundleDescriptor
-  worker_handler,  # type: fn_runner.WorkerHandler
-  p_context,  # type: pipeline_context.PipelineContext
-   ):
+   execution_context, # type: FnApiRunnerExecutionContext
+   stage,  # type: translations.Stage
+   num_workers,  # type: int
+  ):
 self.execution_context = execution_context
-self.process_bundle_descriptor = process_bundle_descriptor
-self.worker_handler = worker_handler
-self.pipeline_context = p_context
+self.stage = stage
+self.bundle_uid = self.execution_context.next_uid()
+self.num_workers = num_workers
+
+# Properties that are lazily initialized
+self._process_bundle_descriptor = None
+self._worker_handlers = None
+
+  @property
+  def worker_handlers(self):
+if self._worker_handlers is None:
+  self._worker_handlers = self.execution_context.worker_handler_manager\
+.get_worker_handlers(self.stage.environment, self.num_workers)
+return self._worker_handlers
+
+  def data_api_service_descriptor(self):
+# All worker_handlers share the same grpc server, so we can read grpc 
server
+# info from any worker_handler and read from the first worker_handler.
+return next(iter(self.worker_handlers)).data_api_service_descriptor()
+
+  def state_api_service_descriptor(self):
+# All worker_handlers share the same grpc server, so we can read grpc 
server
+# info from any worker_handler and read from the first worker_handler.
+return next(iter(self.worker_handlers)).state_api_service_descriptor()
+
+  @property
+  def process_bundle_descriptor(self):
+if self._process_bundle_descriptor is None:
+  self._process_bundle_descriptor = self._build_process_bundle_descriptor()
+return self._process_bundle_descriptor
+
+  def _build_process_bundle_descriptor(self):
+res = beam_fn_api_pb2.ProcessBundleDescriptor(
+id=self.bundle_uid,
+transforms={
+transform.unique_name: transform
+for transform in self.stage.transforms
+},
+pcollections=dict(
+self.execution_context.pipeline_components.pcollections.items()),
+   

[jira] [Work logged] (BEAM-9608) Add context managers for FnApiRunner to manage execution of each bundle

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9608?focusedWorklogId=411420=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411420
 ]

ASF GitHub Bot logged work on BEAM-9608:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:03
Start Date: 27/Mar/20 22:03
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11229: [BEAM-9608] 
Increasing scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399558218
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
 ##
 @@ -415,24 +412,24 @@ def _run_bundle_multiple_times_for_testing(
 cache_token_generator=cache_token_generator)
 testing_bundle_manager.process_bundle(data_input, data_output)
   finally:
-worker_handler.state.restore()
+
runner_execution_context.worker_handler_manager.state_servicer.restore()
 
   def _collect_written_timers_and_add_to_deferred_inputs(
   self,
-  pipeline_components,  # type: beam_runner_api_pb2.Components
-  stage,  # type: translations.Stage
+  runner_execution_context,  # type: execution.FnApiRunnerExecutionContext
   bundle_context_manager,  # type: execution.BundleContextManager
   deferred_inputs,  # type: MutableMapping[str, PartitionableBuffer]
-  data_channel_coders,  # type: Mapping[str, str]
   ):
 # type: (...) -> None
 
-for transform_id, timer_writes in stage.timer_pcollections:
+for transform_id, timer_writes in \
 
 Review comment:
   backslash
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411420)
Time Spent: 1h 10m  (was: 1h)

> Add context managers for FnApiRunner to manage execution of each bundle
> ---
>
> Key: BEAM-9608
> URL: https://issues.apache.org/jira/browse/BEAM-9608
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Pablo Estrada
>Assignee: Pablo Estrada
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411417=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411417
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 27/Mar/20 22:03
Start Date: 27/Mar/20 22:03
Worklog Time Spent: 10m 
  Work Description: steveniemitz commented on issue #11226: [BEAM-9557] Fix 
timer window boundary checking
URL: https://github.com/apache/beam/pull/11226#issuecomment-605334008
 
 
   looks great!  These tests will be really helpful to ensure we don't have 
more regressions in the future.  Thanks!
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411417)
Time Spent: 3h 10m  (was: 3h)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411414=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411414
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 27/Mar/20 21:58
Start Date: 27/Mar/20 21:58
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #11226: [BEAM-9557] Fix 
timer window boundary checking
URL: https://github.com/apache/beam/pull/11226#issuecomment-605332723
 
 
   @steveniemitz tests added
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411414)
Time Spent: 3h  (was: 2h 50m)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8280) re-enable IOTypeHints.from_callable

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8280?focusedWorklogId=411412=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411412
 ]

ASF GitHub Bot logged work on BEAM-8280:


Author: ASF GitHub Bot
Created on: 27/Mar/20 21:51
Start Date: 27/Mar/20 21:51
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11232: [BEAM-8280] 
Document Python 3 annotations support
URL: https://github.com/apache/beam/pull/11232#discussion_r399554700
 
 

 ##
 File path: website/src/documentation/sdks/python-type-safety.md
 ##
 @@ -64,22 +84,75 @@ To specify type hints as properties of a `DoFn` or 
`PTransform`, use the decorat
 
 The following code declares an `int` type hint on `FilterEvensDoFn`, using the 
decorator `@with_input_types()`.
 
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py
 tag:type_hints_do_fn %}
 ```
-{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py
 tag:type_hints_do_fn %}```
 
 Decorators receive an arbitrary number of positional and/or keyword arguments, 
typically interpreted in the context of the function they're wrapping. 
Generally the first argument is a type hint for the main input, and additional 
arguments are type hints for side inputs.
 
-### Defining Generic Types
+### Declaring Type Hints Using Annotations
 
 Review comment:
   "Declaring Type Hints Using Type Annotations" (might be a bit redundant, but 
is clearer). 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411412)

> re-enable IOTypeHints.from_callable
> ---
>
> Key: BEAM-8280
> URL: https://issues.apache.org/jira/browse/BEAM-8280
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Major
>  Time Spent: 11h 40m
>  Remaining Estimate: 0h
>
> See https://issues.apache.org/jira/browse/BEAM-8279



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8280) re-enable IOTypeHints.from_callable

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8280?focusedWorklogId=411410=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411410
 ]

ASF GitHub Bot logged work on BEAM-8280:


Author: ASF GitHub Bot
Created on: 27/Mar/20 21:51
Start Date: 27/Mar/20 21:51
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11232: [BEAM-8280] 
Document Python 3 annotations support
URL: https://github.com/apache/beam/pull/11232#discussion_r399553756
 
 

 ##
 File path: website/src/documentation/sdks/python-type-safety.md
 ##
 @@ -23,38 +23,58 @@ Python is a dynamically-typed language with no static type 
checking. Because of
 
 The Apache Beam SDK for Python uses **type hints** during pipeline 
construction and runtime to try to emulate the correctness guarantees achieved 
by true static typing. Additionally, using type hints lays some groundwork that 
allows the backend service to perform efficient type deduction and registration 
of `Coder` objects.
 
-Python version 3.5 introduces a module called **typing** to provide hints for 
type validators in the language. The Beam SDK for Python, based on Python 
version 2.7, implements a subset of [PEP 
484](https://www.python.org/dev/peps/pep-0484/) and aims to follow it as 
closely as possible in its own typehints module.
+Python version 3.5 introduces a module called **typing** to provide hints for 
type validators in the language.
+The Beam SDK for Python implements a subset of [PEP 
484](https://www.python.org/dev/peps/pep-0484/) and aims to follow it as 
closely as possible in its own typehints module.
+
+These flags control Beam type safety:
+- `--no_pipeline_type_check`
+
+  Disables type checking during pipeline construction.
+  Default is to perform these checks.
+- `--runtime_type_check`
+
+  Enables runtime type checking of every element.
+  This may affect pipeline performance, so the default is to skip these checks.
 
 ## Benefits of Type Hints
 
-The Beam SDK for Python includes some automatic type checking: for example, 
some `PTransform`s, such as `Create` and simple `ParDo` transforms, attempt to 
deduce their output type given their input. However, the Beam cannot infer 
types in all cases. Therefore, the recommendation is that you declare type 
hints to aid you in performing your own type checks if necessary.
+The Beam SDK for Python includes some automatic type checking: for example, 
some `PTransforms`, such as `Create` and simple `ParDo` transforms, attempt to 
deduce their output type given their input. However, the Beam cannot infer 
types in all cases. Therefore, the recommendation is that you declare type 
hints to aid you in performing your own type checks if necessary.
 
-When you use type hints, the runner raises exceptions during pipeline 
construction time, rather than runtime. For example, the runner generates an 
exception if it detects that your pipeline applies mismatched `PTransforms` 
(where the expected outputs of one transform do not match the expected inputs 
of the following transform). These exceptions are raised at pipeline 
construction time, regardless of where your pipeline will execute. Introducing 
type hints for the `PTransform`s you define allows you to catch potential bugs 
up front in the local runner, rather than after minutes of execution into a 
deep, complex pipeline.
+When you use type hints, the runner raises exceptions during pipeline 
construction time, rather than runtime. For example, the runner generates an 
exception if it detects that your pipeline applies mismatched `PTransforms` 
(where the expected outputs of one transform do not match the expected inputs 
of the following transform). These exceptions are raised at pipeline 
construction time, regardless of where your pipeline will execute. Introducing 
type hints for the `PTransforms` you define allows you to catch potential bugs 
up front in the local runner, rather than after minutes of execution into a 
deep, complex pipeline.
 
 Review comment:
   I might lead with this paragraph, maybe even giving a very concrete example 
("for example, giving an error when trying to apply a PTransform that expects a 
PCollection of strings to a PCollection of ints"). Then put the other paragraph 
about Beam not always being able to infer types next. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411410)
Time Spent: 11h 40m  (was: 11.5h)

> re-enable IOTypeHints.from_callable
> ---
>
> Key: BEAM-8280
> URL: 

[jira] [Work logged] (BEAM-8280) re-enable IOTypeHints.from_callable

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8280?focusedWorklogId=411411=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411411
 ]

ASF GitHub Bot logged work on BEAM-8280:


Author: ASF GitHub Bot
Created on: 27/Mar/20 21:51
Start Date: 27/Mar/20 21:51
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #11232: [BEAM-8280] 
Document Python 3 annotations support
URL: https://github.com/apache/beam/pull/11232#discussion_r399554554
 
 

 ##
 File path: website/src/documentation/sdks/python-type-safety.md
 ##
 @@ -64,22 +84,75 @@ To specify type hints as properties of a `DoFn` or 
`PTransform`, use the decorat
 
 The following code declares an `int` type hint on `FilterEvensDoFn`, using the 
decorator `@with_input_types()`.
 
+```py
+{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py
 tag:type_hints_do_fn %}
 ```
-{% github_sample 
/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py
 tag:type_hints_do_fn %}```
 
 Decorators receive an arbitrary number of positional and/or keyword arguments, 
typically interpreted in the context of the function they're wrapping. 
Generally the first argument is a type hint for the main input, and additional 
arguments are type hints for side inputs.
 
-### Defining Generic Types
+### Declaring Type Hints Using Annotations
 
 Review comment:
   This should be the preferred way; put it first. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411411)
Time Spent: 11h 40m  (was: 11.5h)

> re-enable IOTypeHints.from_callable
> ---
>
> Key: BEAM-8280
> URL: https://issues.apache.org/jira/browse/BEAM-8280
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Major
>  Time Spent: 11h 40m
>  Remaining Estimate: 0h
>
> See https://issues.apache.org/jira/browse/BEAM-8279



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9512) Anonymous structs have name collision in schema

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9512?focusedWorklogId=411409=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411409
 ]

ASF GitHub Bot logged work on BEAM-9512:


Author: ASF GitHub Bot
Created on: 27/Mar/20 21:49
Start Date: 27/Mar/20 21:49
Worklog Time Spent: 10m 
  Work Description: apilloud commented on pull request #11255: [BEAM-9512] 
Map anonymous structs to schema
URL: https://github.com/apache/beam/pull/11255
 
 
   Map anonymous structs to schemas.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 

[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411402=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411402
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 27/Mar/20 21:34
Start Date: 27/Mar/20 21:34
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #11226: [BEAM-9557] Fix 
timer window boundary checking
URL: https://github.com/apache/beam/pull/11226#issuecomment-605324797
 
 
   retest this please
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411402)
Time Spent: 2h 40m  (was: 2.5h)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411401=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411401
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 27/Mar/20 21:34
Start Date: 27/Mar/20 21:34
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #11226: [BEAM-9557] Fix 
timer window boundary checking
URL: https://github.com/apache/beam/pull/11226#issuecomment-605324746
 
 
   retest this please
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411401)
Time Spent: 2.5h  (was: 2h 20m)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411403=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411403
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 27/Mar/20 21:34
Start Date: 27/Mar/20 21:34
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #11226: [BEAM-9557] Fix 
timer window boundary checking
URL: https://github.com/apache/beam/pull/11226#issuecomment-605324853
 
 
   retest this please
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411403)
Time Spent: 2h 50m  (was: 2h 40m)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411398=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411398
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 27/Mar/20 21:30
Start Date: 27/Mar/20 21:30
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #11252: [BEAM-9557] Fix 
timer window boundary checking
URL: https://github.com/apache/beam/pull/11252#issuecomment-605323570
 
 
   retest this please
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411398)
Time Spent: 2h 20m  (was: 2h 10m)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411396=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411396
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 27/Mar/20 21:29
Start Date: 27/Mar/20 21:29
Worklog Time Spent: 10m 
  Work Description: aaltay commented on issue #11226: [BEAM-9557] Fix timer 
window boundary checking
URL: https://github.com/apache/beam/pull/11226#issuecomment-605323118
 
 
   > @aaltay added new unit tests and fixed the broken one. However I'm 
noticing that test triggering seems broken on a number of PRs now. Is something 
wrong with the Jenkins master?
   
   Yes. Ongoing issue. @yifanzou is working on it. Repeating the test triggers 
a few times usually helps.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411396)
Time Spent: 2h 10m  (was: 2h)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9557) Error setting processing time timers near end-of-window

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9557?focusedWorklogId=411393=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411393
 ]

ASF GitHub Bot logged work on BEAM-9557:


Author: ASF GitHub Bot
Created on: 27/Mar/20 21:25
Start Date: 27/Mar/20 21:25
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #11226: [BEAM-9557] Fix 
timer window boundary checking
URL: https://github.com/apache/beam/pull/11226#issuecomment-605321717
 
 
   @aaltay added new unit tests and fixed the broken one. However I'm noticing 
that test triggering seems broken on a number of PRs now. Is something wrong 
with the Jenkins master?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411393)
Time Spent: 2h  (was: 1h 50m)

> Error setting processing time timers near end-of-window
> ---
>
> Key: BEAM-9557
> URL: https://issues.apache.org/jira/browse/BEAM-9557
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Steve Niemitz
>Assignee: Reuven Lax
>Priority: Critical
> Fix For: 2.20.0
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Previously, it was possible to set a processing time timer past the end of a 
> window, and it would simply not fire.
> However, now, this results in an error:
> {code:java}
> java.lang.IllegalArgumentException: Attempted to set event time timer that 
> outputs for 2020-03-19T18:01:35.000Z but that is after the expiration of 
> window 2020-03-19T17:59:59.999Z
> 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setAndVerifyOutputTimestamp(SimpleDoFnRunner.java:1011)
> 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$TimerInternalsTimer.setRelative(SimpleDoFnRunner.java:934)
> .processElement(???.scala:187)
>  {code}
>  
> I think the regression was introduced in commit 
> a005fd765a762183ca88df90f261f6d4a20cf3e0.  Also notice that the error message 
> is wrong, it says that "event time timer" but the timer is in the processing 
> time domain.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-9331) The Row object needs better builders

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9331?focusedWorklogId=411390=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411390
 ]

ASF GitHub Bot logged work on BEAM-9331:


Author: ASF GitHub Bot
Created on: 27/Mar/20 21:20
Start Date: 27/Mar/20 21:20
Worklog Time Spent: 10m 
  Work Description: reuvenlax commented on issue #10883: [BEAM-9331] Add 
better Row builders
URL: https://github.com/apache/beam/pull/10883#issuecomment-605320135
 
 
   run sql postcommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411390)
Time Spent: 5h  (was: 4h 50m)

> The Row object needs better builders
> 
>
> Key: BEAM-9331
> URL: https://issues.apache.org/jira/browse/BEAM-9331
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>Priority: Major
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> Users should be able to build a Row object by specifying field names. Desired 
> syntax:
>  
> Row.withSchema(schema)
>    .withFieldName("field1", "value)
>   .withFieldName("field2.field3", value)
>   .build()
>  
> Users should also have a builder that allows taking an existing row and 
> changing specific fields.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-5422) Update BigQueryIO DynamicDestinations documentation to clarify usage of getDestination() and getTable()

2020-03-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-5422?focusedWorklogId=411388=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-411388
 ]

ASF GitHub Bot logged work on BEAM-5422:


Author: ASF GitHub Bot
Created on: 27/Mar/20 21:16
Start Date: 27/Mar/20 21:16
Worklog Time Spent: 10m 
  Work Description: udim commented on issue #11241: [BEAM-5422] Document 
DynamicDestinations.getTable uniqueness requirement
URL: https://github.com/apache/beam/pull/11241#issuecomment-605318740
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 411388)
Time Spent: 0.5h  (was: 20m)

> Update BigQueryIO DynamicDestinations documentation to clarify usage of 
> getDestination() and getTable()
> ---
>
> Key: BEAM-5422
> URL: https://issues.apache.org/jira/browse/BEAM-5422
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Chamikara Madhusanka Jayalath
>Assignee: Chamikara Madhusanka Jayalath
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Currently, there are some details related to these methods that should be 
> further clarified. For example, getTable() is expected to return a unique 
> value for each destination.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   >