[jira] [Work started] (BEAM-7933) Adding timeout to JobServer grpc calls
[ https://issues.apache.org/jira/browse/BEAM-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on BEAM-7933 started by Enrico Canzonieri. --- > Adding timeout to JobServer grpc calls > -- > > Key: BEAM-7933 > URL: https://issues.apache.org/jira/browse/BEAM-7933 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Affects Versions: 2.14.0 >Reporter: Enrico Canzonieri >Assignee: Enrico Canzonieri >Priority: Minor > Labels: portability > > grpc calls to the JobServer from the Python SDK do not have timeouts. That > means that the call to pipeline.run()could hang forever if the JobServer is > not running (or failing to start). > E.g. > [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/portable_runner.py#L307] > the call to Prepare() doesn't provide any timeout value and the same applies > to other JobServer requests. > As part of this ticket we could add a default timeout of 60 seconds as the > default timeout for http client. > Additionally, we could consider adding a --job-server-request-timeout to the > [PortableOptions|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L805] > class to be used in the JobServer interactions inside probable_runner.py. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-7933) Adding timeout to JobServer grpc calls
[ https://issues.apache.org/jira/browse/BEAM-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16936278#comment-16936278 ] Enrico Canzonieri commented on BEAM-7933: - Yes, I'm planning to work on this. It shouldn't take me too long to get a pr out. I should have some time by the end of this week. > Adding timeout to JobServer grpc calls > -- > > Key: BEAM-7933 > URL: https://issues.apache.org/jira/browse/BEAM-7933 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Affects Versions: 2.14.0 >Reporter: Enrico Canzonieri >Assignee: Enrico Canzonieri >Priority: Minor > Labels: portability > > grpc calls to the JobServer from the Python SDK do not have timeouts. That > means that the call to pipeline.run()could hang forever if the JobServer is > not running (or failing to start). > E.g. > [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/portable_runner.py#L307] > the call to Prepare() doesn't provide any timeout value and the same applies > to other JobServer requests. > As part of this ticket we could add a default timeout of 60 seconds as the > default timeout for http client. > Additionally, we could consider adding a --job-server-request-timeout to the > [PortableOptions|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L805] > class to be used in the JobServer interactions inside probable_runner.py. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-7933) Adding timeout to JobServer grpc calls
[ https://issues.apache.org/jira/browse/BEAM-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Enrico Canzonieri reassigned BEAM-7933: --- Assignee: Enrico Canzonieri > Adding timeout to JobServer grpc calls > -- > > Key: BEAM-7933 > URL: https://issues.apache.org/jira/browse/BEAM-7933 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Affects Versions: 2.14.0 >Reporter: Enrico Canzonieri >Assignee: Enrico Canzonieri >Priority: Minor > > grpc calls to the JobServer from the Python SDK do not have timeouts. That > means that the call to pipeline.run()could hang forever if the JobServer is > not running (or failing to start). > E.g. > [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/portable_runner.py#L307] > the call to Prepare() doesn't provide any timeout value and the same applies > to other JobServer requests. > As part of this ticket we could add a default timeout of 60 seconds as the > default timeout for http client. > Additionally, we could consider adding a --job-server-request-timeout to the > [PortableOptions|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L805] > class to be used in the JobServer interactions inside probable_runner.py. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (BEAM-7933) Adding timeout to JobServer grpc calls
Enrico Canzonieri created BEAM-7933: --- Summary: Adding timeout to JobServer grpc calls Key: BEAM-7933 URL: https://issues.apache.org/jira/browse/BEAM-7933 Project: Beam Issue Type: Improvement Components: sdk-py-core Affects Versions: 2.14.0 Reporter: Enrico Canzonieri grpc calls to the JobServer from the Python SDK do not have timeouts. That means that the call to pipeline.run()could hang forever if the JobServer is not running (or failing to start). E.g. [https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/portable_runner.py#L307] the call to Prepare() doesn't provide any timeout value and the same applies to other JobServer requests. As part of this ticket we could add a default timeout of 60 seconds as the default timeout for http client. Additionally, we could consider adding a --job-server-request-timeout to the [PortableOptions|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L805] class to be used in the JobServer interactions inside probable_runner.py. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (BEAM-7678) typehints with_output_types annotation doesn't work for stateful DoFn
[ https://issues.apache.org/jira/browse/BEAM-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878137#comment-16878137 ] Enrico Canzonieri commented on BEAM-7678: - >From my understanding when we assign a typehints to a DoFn, that should result >in {{output.element_type}} being set to the proper type in: >[https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643]{{}} I experimented with the following change, which seems to work: {code} diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index a0e8a72759..fa67242c36 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -640,7 +640,7 @@ class Pipeline(object): if len(transform_node.outputs) == 1: # The runner often has expectations about the output types as well. output, = transform_node.outputs.values() - output.element_type = transform_node.transform.infer_output_type( + output.element_type = output.element_type or transform_node.transform.infer_output_type( pcoll.element_type) {code} I'm not sure whether this will introduce any regression though. > typehints with_output_types annotation doesn't work for stateful DoFn > -- > > Key: BEAM-7678 > URL: https://issues.apache.org/jira/browse/BEAM-7678 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.13.0 >Reporter: Enrico Canzonieri >Priority: Minor > > The output types typehints seem to be ignored when using a stateful DoFn, but > the same typehint works perfectly when used without state. This issue > prevents a custom Coder from being used because Beam will default to one of > the {{FastCoders}} (I believe Pickle). > Example code: > {code} > @typehints.with_output_types(Message) > class StatefulDoFn(DoFn): > COUNTER_STATE = BagStateSpec('counter', VarIntCoder()) > def process(self, element, counter=DoFn.StateParam(COUNTER_STATE)): > (key, messages) = element > newMessage = Message() > return [newMessage] > {code} > The example code is just defining a stateful DoFn for python. The used runner > is the Flink 1.6.4 portable runner. > Finally, overriding {{infer_output_type}} to return a > {{typehints.List[Message]}} solves the issue. > Looking at the code, it seems to me that in > [https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643] > we do not take the typehints into account. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-7678) typehints with_output_types annotation doesn't work for stateful DoFn
Enrico Canzonieri created BEAM-7678: --- Summary: typehints with_output_types annotation doesn't work for stateful DoFn Key: BEAM-7678 URL: https://issues.apache.org/jira/browse/BEAM-7678 Project: Beam Issue Type: Bug Components: sdk-py-core Affects Versions: 2.13.0 Reporter: Enrico Canzonieri The output types typehints seem to be ignored when using a stateful DoFn, but the same typehint works perfectly when used without state. This issue prevents a custom Coder from being used because Beam will default to one of the {{FastCoders}} (I believe Pickle). Example code: {code} @typehints.with_output_types(Message) class StatefulDoFn(DoFn): COUNTER_STATE = BagStateSpec('counter', VarIntCoder()) def process(self, element, counter=DoFn.StateParam(COUNTER_STATE)): (key, messages) = element newMessage = Message() return [newMessage] {code} The example code is just defining a stateful DoFn for python. The used runner is the Flink 1.6.4 portable runner. Finally, overriding {{infer_output_type}} to return a {{typehints.List[Message]}} solves the issue. Looking at the code, it seems to me that in [https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643] we do not take the typehints into account. -- This message was sent by Atlassian JIRA (v7.6.3#76005)