aredshift opened a new issue, #34101:
URL: https://github.com/apache/beam/issues/34101

   ### What happened?
   
   Launching a Google Cloud Dataflow pipeline from a VSCode Jupyter notebook 
with Beam Python SDK version 2.63.0//Python3.11 results in a failure to parse 
options that otherwise are accepted when running a Python script (and 
previously successfully in Beam version 2.52.0//Python3.8).
   
   The `flexrs_goal` flag is being passed the Jupyter kernel connection file.
   
   My best guess is that it has to do with the argument for the 
`ipykernel_launcher.py` script being passed `--f 
/<user-home>/.local/share/jupyter/runtime/<connection_file>.json` which 
interacts with `argparse`'s `allow_abbrev` option (thus mapping to 
`--flexrs_goal`. 
   
   Printing `sys.argv` in the failing cell yields: 
`['/opt/conda/envs/my-conda-env/lib/python3.11/site-packages/ipykernel_launcher.py',
 '--f=<path>.json']` ([called-by VSCode plugin source 
here](https://github.com/microsoft/vscode-jupyter/blob/245979fbb9ae3c27eefef9777c16b8da5dd00d04/src/kernels/raw/launcher/kernelProcess.node.ts#L547)
 I think).
   
   I'm not sure what changed between Beam 2.52.0 and 2.63.0 or between Python 
3.8 and 3.11 that triggered this regression. It looks like according to [this 
doc](https://jupyter-client.readthedocs.io/en/latest/kernels.html#kernelspecs) 
the kernel expects a `-f` flag but successfully parses the `--f` passed by the 
VSCode plugin.
   
   Output:
   ```
   ... logs ...
   usage: ipykernel_launcher.py [-h] [--dataflow_endpoint DATAFLOW_ENDPOINT]
                                [--project PROJECT] [--job_name JOB_NAME]
                                [--staging_location STAGING_LOCATION]
                                [--temp_location TEMP_LOCATION] [--region 
REGION]
                                [--service_account_email SERVICE_ACCOUNT_EMAIL]
                                [--no_auth]
                                [--template_location TEMPLATE_LOCATION]
                                [--label LABELS] [--update]
                                [--transform_name_mapping 
TRANSFORM_NAME_MAPPING]
                                [--enable_streaming_engine]
                                [--dataflow_kms_key DATAFLOW_KMS_KEY]
                                [--create_from_snapshot CREATE_FROM_SNAPSHOT]
                                [--flexrs_goal {COST_OPTIMIZED,SPEED_OPTIMIZED}]
                                [--dataflow_service_option 
DATAFLOW_SERVICE_OPTIONS]
                                [--enable_hot_key_logging]
                                [--enable_artifact_caching]
                                [--impersonate_service_account 
IMPERSONATE_SERVICE_ACCOUNT]
                                [--gcp_oauth_scope GCP_OAUTH_SCOPES]
                                [--enable_bucket_read_metric_counter]
                                [--enable_bucket_write_metric_counter]
                                [--no_gcsio_throttling_counter]
                                [--enable_gcsio_blob_generation]
   ipykernel_launcher.py: error: argument --flexrs_goal: invalid choice: 
'/<usr-home>/.local/share/jupyter/runtime/kernel-<id>.json' (choose from 
'COST_OPTIMIZED', 'SPEED_OPTIMIZED')
   ```
   <details open>
     <summary><h2>Traceback<h2></summary>
   
   ```
   ... pipeline = beam.Pipeline(options=pipeline_options)
   
   File 
/opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/pipeline.py:211,
 in Pipeline.__init__(self, runner, options, argv, display_data)
       206   raise TypeError(
       207       'Runner %s is not a PipelineRunner object or the '
       208       'name of a registered runner.' % runner)
       210 # Validate pipeline options
   --> 211 errors = PipelineOptionsValidator(self._options, runner).validate()
       212 if errors:
       213   raise ValueError(
       214       'Pipeline has validations errors: \n' + '\n'.join(errors))
   
   File 
/opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/options/pipeline_options_validator.py:149,
 in PipelineOptionsValidator.validate(self)
       147 for cls in self.OPTIONS:
       148   if 'validate' in cls.__dict__ and 
callable(cls.__dict__['validate']):
   --> 149     errors.extend(self.options.view_as(cls).validate(self))
       150 return errors
   
   File 
/opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/options/pipeline_options.py:1023,
 in GoogleCloudOptions.validate(self, validator)
      1021 errors = []
      1022 if validator.is_service_runner():
   -> 1023   errors.extend(self._handle_temp_and_staging_locations(validator))
      1024   errors.extend(validator.validate_cloud_options(self))
      1026 if self.view_as(DebugOptions).dataflow_job_file:
   
   File 
/opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/options/pipeline_options.py:1004,
 in GoogleCloudOptions._handle_temp_and_staging_locations(self, validator)
      1002   return []
      1003 elif not staging_errors and not temp_errors:
   -> 1004   self._warn_if_soft_delete_policy_enabled('temp_location')
      1005   self._warn_if_soft_delete_policy_enabled('staging_location')
      1006   return []
   
   File 
/opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/options/pipeline_options.py:978,
 in GoogleCloudOptions._warn_if_soft_delete_policy_enabled(self, arg_name)
       976 try:
       977   from apache_beam.io.gcp import gcsio
   --> 978   if gcsio.GcsIO().is_soft_delete_enabled(gcs_path):
       979     _LOGGER.warning(
       980         "Bucket specified in %s has soft-delete policy enabled."
       981         " To avoid being billed for unnecessary storage costs, turn"
      (...)
       985         " https://cloud.google.com/storage/docs/use-soft-delete";
       986         "#remove-soft-delete-policy." % arg_name)
       987 except ImportError:
   
   File 
/opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/io/gcp/gcsio.py:151,
 in GcsIO.__init__(self, storage_client, pipeline_options)
       149   pipeline_options = 
PipelineOptions.from_dictionary(pipeline_options)
       150 if storage_client is None:
   --> 151   storage_client = create_storage_client(pipeline_options)
       153 google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
       154 self.enable_read_bucket_metric = getattr(
       155     google_cloud_options, 'enable_bucket_read_metric_counter', False)
   
   File 
/opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/io/gcp/gcsio.py:119,
 in create_storage_client(pipeline_options, use_credentials)
       107 """Create a GCS client for Beam via GCS Client Library.
       108 
       109 Args:
      (...)
       116   A google.cloud.storage.client.Client instance.
       117 """
       118 if use_credentials:
   --> 119   credentials = auth.get_service_credentials(pipeline_options)
       120 else:
       121   credentials = None
   
   File 
/opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/internal/gcp/auth.py:82,
 in get_service_credentials(pipeline_options)
        68 def get_service_credentials(pipeline_options):
        69   # type: (PipelineOptions) -> Optional[_ApitoolsCredentialsAdapter]
        71   """For internal use only; no backwards-compatibility guarantees.
        72 
        73   Get credentials to access Google services.
      (...)
        80     not found. Returned object is thread-safe.
        81   """
   ---> 82   return _Credentials.get_service_credentials(pipeline_options)
   
   File 
/opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/internal/gcp/auth.py:139,
 in _Credentials.get_service_credentials(cls, pipeline_options)
       135     socket.setdefaulttimeout(60)
       136   _LOGGER.debug(
       137       "socket default timeout is %s seconds.", 
socket.getdefaulttimeout())
   --> 139   cls._credentials = cls._get_service_credentials(pipeline_options)
       140   cls._credentials_init = True
       142 return cls._credentials
   
   File 
/opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/internal/gcp/auth.py:156,
 in _Credentials._get_service_credentials(pipeline_options)
       152   return None
       154 try:
       155   # pylint: disable=c-extension-no-member
   --> 156   credentials = 
_Credentials._get_credentials_with_retrys(pipeline_options)
       157   credentials = _Credentials._add_impersonation_credentials(
       158       credentials, pipeline_options)
       159   credentials = _ApitoolsCredentialsAdapter(credentials)
   
   File 
/opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/utils/retry.py:298,
 in with_exponential_backoff.<locals>.real_decorator.<locals>.wrapper(*args, 
**kwargs)
       296 while True:
       297   try:
   --> 298     return fun(*args, **kwargs)
       299   except Exception as exn:  # pylint: disable=broad-except
       300     if not retry_filter(exn):
   
   File 
/opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/internal/gcp/auth.py:176,
 in _Credentials._get_credentials_with_retrys(pipeline_options)
       172 @staticmethod
       173 @retry.with_exponential_backoff(num_retries=4, initial_delay_secs=2)
       174 def _get_credentials_with_retrys(pipeline_options):
       175   credentials, _ = google.auth.default(
   --> 176     
scopes=pipeline_options.view_as(GoogleCloudOptions).gcp_oauth_scopes)
       177   return credentials
   
   File 
/opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/options/pipeline_options.py:467,
 in PipelineOptions.view_as(self, cls)
       445 def view_as(self, cls: Type[PipelineOptionsT]) -> PipelineOptionsT:
       446   """Returns a view of current object as provided PipelineOption 
subclass.
       447 
       448   Example Usage::
      (...)
       465 
       466   """
   --> 467   view = cls(self._flags)
       469   for option_name in view._visible_option_list():
       470     # Initialize values of keys defined by a cls.
       471     #
      (...)
       475     # backed by the same list across multiple views, and that any 
overrides of
       476     # pipeline options already stored in _all_options are preserved.
       477     if option_name not in self._all_options:
   
   File 
/opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/options/pipeline_options.py:237,
 in PipelineOptions.__init__(self, flags, **kwargs)
       233     cls._add_argparse_args(parser)  # type: ignore
       235 # The _visible_options attribute will contain options that were 
recognized
       236 # by the parser.
   --> 237 self._visible_options, _ = parser.parse_known_args(flags)
       239 # self._all_options is initialized with overrides to flag values,
       240 # provided in kwargs, and will store key-value pairs for options 
recognized
       241 # by current PipelineOptions [sub]class and its views that may be 
created.
      (...)
       244 # as each new views are created.
       245 # Users access this dictionary store via __getattr__ / __setattr__ 
methods.
       246 self._all_options = kwargs
   
   File /opt/conda/envs/my-conda-env/lib/python3.11/argparse.py:1909, in 
ArgumentParser.parse_known_args(self, args, namespace)
      1907         namespace, args = self._parse_known_args(args, namespace)
      1908     except ArgumentError as err:
   -> 1909         self.error(str(err))
      1910 else:
      1911     namespace, args = self._parse_known_args(args, namespace)
   
   File 
/opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/options/pipeline_options.py:137,
 in _BeamArgumentParser.error(self, message)
       135 if message.startswith('ambiguous option: '):
       136   return
   --> 137 super().error(message)
   
   File /opt/conda/envs/my-conda-env/lib/python3.11/argparse.py:2640, in 
ArgumentParser.error(self, message)
      2638 self.print_usage(_sys.stderr)
      2639 args = {'prog': self.prog, 'message': message}
   -> 2640 self.exit(2, _('%(prog)s: error: %(message)s\n') % args)
   
   File /opt/conda/envs/my-conda-env/lib/python3.11/argparse.py:2627, in 
ArgumentParser.exit(self, status, message)
      2625 if message:
      2626     self._print_message(message, _sys.stderr)
   -> 2627 _sys.exit(status)
   
   SystemExit: 2
   ```
   </details>
   
   ## Environment
   - OS: Debian GNU/Linux 12 (bookworm)
   - Architecture:             aarch64
      - CPU op-mode(s):         64-bit
      - Byte Order:             Little Endian
   - Python version: 3.11.11
   - Beam version: 2.63.0 (also fails on 2.61.0)
   - IPykernel version: 6.29.5
   - Jupyter client version: 8.6.3
   - Jupyter server version: 2.15.0
   - Jupyter core version: 5.7.2
   - Interfacing via VSCode devcontainer
   - Environment managed by Conda(micromamba)/Poetry
   
   ## Things I've tried
   - Populating the `flexrs_goal` option explicitly instead of omitting it
   - Running the same program outside of a jupyter notebook (successful)
   - Running with Python 3.8 & Beam 2.52.0 (successful)
   
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [x] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [x] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to