[jira] [Work started] (BEAM-7933) Adding timeout to JobServer grpc calls

2019-09-26 Thread Enrico Canzonieri (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on BEAM-7933 started by Enrico Canzonieri.
---
> Adding timeout to JobServer grpc calls
> --
>
> Key: BEAM-7933
> URL: https://issues.apache.org/jira/browse/BEAM-7933
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.14.0
>Reporter: Enrico Canzonieri
>Assignee: Enrico Canzonieri
>Priority: Minor
>  Labels: portability
>
> grpc calls to the JobServer from the Python SDK do not have timeouts. That 
> means that the call to pipeline.run()could hang forever if the JobServer is 
> not running (or failing to start).
> E.g. 
> [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/portable_runner.py#L307]
>  the call to Prepare() doesn't provide any timeout value and the same applies 
> to other JobServer requests.
> As part of this ticket we could add a default timeout of 60 seconds as the 
> default timeout for http client.
> Additionally, we could consider adding a --job-server-request-timeout to the 
> [PortableOptions|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L805]
>  class to be used in the JobServer interactions inside probable_runner.py.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-7933) Adding timeout to JobServer grpc calls

2019-09-23 Thread Enrico Canzonieri (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16936278#comment-16936278
 ] 

Enrico Canzonieri commented on BEAM-7933:
-

Yes, I'm planning to work on this. It shouldn't take me too long to get a pr 
out. I should have some time by the end of this week.

> Adding timeout to JobServer grpc calls
> --
>
> Key: BEAM-7933
> URL: https://issues.apache.org/jira/browse/BEAM-7933
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.14.0
>Reporter: Enrico Canzonieri
>Assignee: Enrico Canzonieri
>Priority: Minor
>  Labels: portability
>
> grpc calls to the JobServer from the Python SDK do not have timeouts. That 
> means that the call to pipeline.run()could hang forever if the JobServer is 
> not running (or failing to start).
> E.g. 
> [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/portable_runner.py#L307]
>  the call to Prepare() doesn't provide any timeout value and the same applies 
> to other JobServer requests.
> As part of this ticket we could add a default timeout of 60 seconds as the 
> default timeout for http client.
> Additionally, we could consider adding a --job-server-request-timeout to the 
> [PortableOptions|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L805]
>  class to be used in the JobServer interactions inside probable_runner.py.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-7933) Adding timeout to JobServer grpc calls

2019-08-09 Thread Enrico Canzonieri (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Enrico Canzonieri reassigned BEAM-7933:
---

Assignee: Enrico Canzonieri

> Adding timeout to JobServer grpc calls
> --
>
> Key: BEAM-7933
> URL: https://issues.apache.org/jira/browse/BEAM-7933
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Affects Versions: 2.14.0
>Reporter: Enrico Canzonieri
>Assignee: Enrico Canzonieri
>Priority: Minor
>
> grpc calls to the JobServer from the Python SDK do not have timeouts. That 
> means that the call to pipeline.run()could hang forever if the JobServer is 
> not running (or failing to start).
> E.g. 
> [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/portable_runner.py#L307]
>  the call to Prepare() doesn't provide any timeout value and the same applies 
> to other JobServer requests.
> As part of this ticket we could add a default timeout of 60 seconds as the 
> default timeout for http client.
> Additionally, we could consider adding a --job-server-request-timeout to the 
> [PortableOptions|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L805]
>  class to be used in the JobServer interactions inside probable_runner.py.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (BEAM-7933) Adding timeout to JobServer grpc calls

2019-08-08 Thread Enrico Canzonieri (JIRA)
Enrico Canzonieri created BEAM-7933:
---

 Summary: Adding timeout to JobServer grpc calls
 Key: BEAM-7933
 URL: https://issues.apache.org/jira/browse/BEAM-7933
 Project: Beam
  Issue Type: Improvement
  Components: sdk-py-core
Affects Versions: 2.14.0
Reporter: Enrico Canzonieri


grpc calls to the JobServer from the Python SDK do not have timeouts. That 
means that the call to pipeline.run()could hang forever if the JobServer is not 
running (or failing to start).
E.g. 
[https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/portable_runner.py#L307]
 the call to Prepare() doesn't provide any timeout value and the same applies 
to other JobServer requests.
As part of this ticket we could add a default timeout of 60 seconds as the 
default timeout for http client.
Additionally, we could consider adding a --job-server-request-timeout to the 
[PortableOptions|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L805]
 class to be used in the JobServer interactions inside probable_runner.py.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (BEAM-7678) typehints with_output_types annotation doesn't work for stateful DoFn

2019-07-03 Thread Enrico Canzonieri (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878137#comment-16878137
 ] 

Enrico Canzonieri commented on BEAM-7678:
-

>From my understanding when we assign a typehints to a DoFn, that should result 
>in {{output.element_type}} being set to the proper type in: 
>[https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643]{{}}

I experimented with the following change, which seems to work:
{code}
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index a0e8a72759..fa67242c36 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -640,7 +640,7 @@ class Pipeline(object):
   if len(transform_node.outputs) == 1:
 # The runner often has expectations about the output types as well.
 output, = transform_node.outputs.values()
-    output.element_type = transform_node.transform.infer_output_type(
+    output.element_type = output.element_type or 
transform_node.transform.infer_output_type(
 pcoll.element_type)
{code}
I'm not sure whether this will introduce any regression though.

> typehints with_output_types annotation doesn't work for stateful DoFn 
> --
>
> Key: BEAM-7678
> URL: https://issues.apache.org/jira/browse/BEAM-7678
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.13.0
>Reporter: Enrico Canzonieri
>Priority: Minor
>
> The output types typehints seem to be ignored when using a stateful DoFn, but 
> the same typehint works perfectly when used without state. This issue 
> prevents a custom Coder from being used because Beam will default to one of 
> the {{FastCoders}} (I believe Pickle).
> Example code:
> {code}
> @typehints.with_output_types(Message)
> class StatefulDoFn(DoFn):
> COUNTER_STATE = BagStateSpec('counter', VarIntCoder())
> def process(self, element, counter=DoFn.StateParam(COUNTER_STATE)):
>   (key, messages) = element
>   newMessage = Message()
>   return [newMessage]
> {code}
> The example code is just defining a stateful DoFn for python. The used runner 
> is the Flink 1.6.4 portable runner.
> Finally, overriding {{infer_output_type}} to return a 
> {{typehints.List[Message]}} solves the issue.
> Looking at the code, it seems to me that in 
> [https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643]
>  we do not take the typehints into account.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-7678) typehints with_output_types annotation doesn't work for stateful DoFn

2019-07-02 Thread Enrico Canzonieri (JIRA)
Enrico Canzonieri created BEAM-7678:
---

 Summary: typehints with_output_types annotation doesn't work for 
stateful DoFn 
 Key: BEAM-7678
 URL: https://issues.apache.org/jira/browse/BEAM-7678
 Project: Beam
  Issue Type: Bug
  Components: sdk-py-core
Affects Versions: 2.13.0
Reporter: Enrico Canzonieri


The output types typehints seem to be ignored when using a stateful DoFn, but 
the same typehint works perfectly when used without state. This issue prevents 
a custom Coder from being used because Beam will default to one of the 
{{FastCoders}} (I believe Pickle).

Example code:
{code}
@typehints.with_output_types(Message)
class StatefulDoFn(DoFn):

COUNTER_STATE = BagStateSpec('counter', VarIntCoder())
def process(self, element, counter=DoFn.StateParam(COUNTER_STATE)):
  (key, messages) = element
  newMessage = Message()
  return [newMessage]
{code}
The example code is just defining a stateful DoFn for python. The used runner 
is the Flink 1.6.4 portable runner.

Finally, overriding {{infer_output_type}} to return a 
{{typehints.List[Message]}} solves the issue.

Looking at the code, it seems to me that in 
[https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643]
 we do not take the typehints into account.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)