tvalentyn commented on code in PR #17244: URL: https://github.com/apache/beam/pull/17244#discussion_r870706037
########## sdks/python/apache_beam/internal/gcp/auth.py: ########## @@ -59,7 +73,7 @@ def set_running_in_gce(worker_executing_project): executing_project = worker_executing_project -def get_service_credentials(): +def get_service_credentials(pipeline_options): Review Comment: Any concerns with setting `pipeline_options=None` ? ########## sdks/python/apache_beam/io/gcp/gcsfilesystem.py: ########## @@ -45,6 +45,10 @@ class GCSFileSystem(FileSystem): CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations GCS_PREFIX = 'gs://' + def __init__(self, pipeline_options): + super().__init__(pipeline_options) Review Comment: another possibility is to initialize the global credential here, like: _ = auth.get_service_credentials(pipeline_options) ########## sdks/python/apache_beam/io/gcp/bigquery_tools.py: ########## @@ -328,7 +328,7 @@ class BigQueryWrapper(object): def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None): self.client = client or bigquery.BigqueryV2( http=get_new_http(), - credentials=auth.get_service_credentials(), + credentials=auth.get_service_credentials({}), Review Comment: Let's pass `None` (or have None a default value). ########## sdks/python/apache_beam/internal/gcp/auth.py: ########## @@ -160,3 +167,26 @@ def _get_service_credentials(): 'Connecting anonymously.', e) return None + + @staticmethod + def _add_impersonation_credentials(credentials, pipeline_options): + if not pipeline_options: + return credentials + if isinstance(pipeline_options, PipelineOptions): + gcs_options = pipeline_options.view_as(GoogleCloudOptions) + impersonate_service_account = gcs_options.impersonate_service_account + else: + impersonate_service_account = pipeline_options.get( Review Comment: When would this param be not PipelineOptions? If somehow this value is dictionary, this call will crash with key error. ########## sdks/python/apache_beam/io/gcp/bigquery_tools.py: ########## @@ -1475,8 +1475,12 @@ def __init__(self, sink, test_bigquery_client=None, buffer_size=None): # If table schema did not define a project we default to executing project. if self.project_id is None and hasattr(sink, 'pipeline_options'): + self._pipeline_options = sink.pipeline_options Review Comment: Do we need this? ########## sdks/python/apache_beam/internal/gcp/auth.py: ########## @@ -59,7 +73,7 @@ def set_running_in_gce(worker_executing_project): executing_project = worker_executing_project -def get_service_credentials(): +def get_service_credentials(pipeline_options): Review Comment: You can also mention in the docstring that How pipeline options are used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org