[ 
https://issues.apache.org/jira/browse/BEAM-5442?focusedWorklogId=191804&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-191804
 ]

ASF GitHub Bot logged work on BEAM-5442:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Jan/19 17:03
            Start Date: 29/Jan/19 17:03
    Worklog Time Spent: 10m 
      Work Description: mxm commented on pull request #7597: [BEAM-5442] 
Retrieve valid runner options from JobService
URL: https://github.com/apache/beam/pull/7597#discussion_r251926546
 
 

 ##########
 File path: sdks/python/apache_beam/runners/portability/portable_runner.py
 ##########
 @@ -189,20 +189,60 @@ def run_pipeline(self, pipeline, options):
       proto_pipeline = fn_api_runner_transforms.with_stages(
           proto_pipeline, stages)
 
-    # TODO: Define URNs for options.
-    # convert int values: https://issues.apache.org/jira/browse/BEAM-5509
-    p_options = {'beam:option:' + k + ':v1': (str(v) if type(v) == int else v)
-                 for k, v in options.get_all_options().items()
-                 if v is not None}
-
     if not job_service:
       channel = grpc.insecure_channel(job_endpoint)
       grpc.channel_ready_future(channel).result()
       job_service = beam_job_api_pb2_grpc.JobServiceStub(channel)
     else:
       channel = None
 
+    # fetch runner options from job service
+    def send_options_request(max_retries=5):
+      num_retries = 0
+      while True:
+        try:
+          # This reports channel is READY but connections may fail
+          # Seems to be only an issue on Mac with port forwardings
+          if channel:
+            grpc.channel_ready_future(channel).result()
+          return job_service.DescribePipelineOptions(
+              beam_job_api_pb2.DescribePipelineOptionsRequest())
+        except grpc._channel._Rendezvous as e:
+          num_retries += 1
+          if num_retries > max_retries:
+            raise e
+
+    options_response = send_options_request()
+
+    def add_runner_options(parser):
+      for option in options_response.options:
+        try:
+          # no default values - we don't want runner options
+          # added unless they were specified by the user
+          # TODO: types
+          action = 'store'
+          if option.type == 'Boolean':
+            action = 'store_true'
+          parser.add_argument("--%s" % option.name,
+                              action=action,
+                              help=option.description
+                             )
+        except Exception as e:
+          # ignore runner options that are already present
+          # only in this case is duplicate not treated as error
+          if 'conflicting option string' not in str(e):
+            raise
 
 Review comment:
   >This is not a scoping issue, these are the same options defined in 2 places.
   
   If they were scoped, they wouldn't be defined twice :) I agree though that 
this is an old issue which can be addressed separately.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 191804)
    Time Spent: 24h 10m  (was: 24h)

> PortableRunner swallows custom options for Runner
> -------------------------------------------------
>
>                 Key: BEAM-5442
>                 URL: https://issues.apache.org/jira/browse/BEAM-5442
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core, sdk-py-core
>            Reporter: Maximilian Michels
>            Assignee: Thomas Weise
>            Priority: Major
>              Labels: portability, portability-flink
>          Time Spent: 24h 10m
>  Remaining Estimate: 0h
>
> The PortableRunner doesn't pass custom PipelineOptions to the executing 
> Runner.
> Example: {{--parallelism=4}} won't be forwarded to the FlinkRunner.
> (The option is just removed during proto translation without any warning)
> We should allow some form of customization through the options, even for the 
> PortableRunner. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to