tvalentyn commented on code in PR #17244:
URL: https://github.com/apache/beam/pull/17244#discussion_r870706037


##########
sdks/python/apache_beam/internal/gcp/auth.py:
##########
@@ -59,7 +73,7 @@ def set_running_in_gce(worker_executing_project):
   executing_project = worker_executing_project
 
 
-def get_service_credentials():
+def get_service_credentials(pipeline_options):

Review Comment:
   Any concerns with setting `pipeline_options=None` ?



##########
sdks/python/apache_beam/io/gcp/gcsfilesystem.py:
##########
@@ -45,6 +45,10 @@ class GCSFileSystem(FileSystem):
   CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE  # Chuck size in batch operations
   GCS_PREFIX = 'gs://'
 
+  def __init__(self, pipeline_options):
+    super().__init__(pipeline_options)

Review Comment:
   another possibility is to initialize the global credential here, like:
   
   _ = auth.get_service_credentials(pipeline_options)



##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -328,7 +328,7 @@ class BigQueryWrapper(object):
   def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None):
     self.client = client or bigquery.BigqueryV2(
         http=get_new_http(),
-        credentials=auth.get_service_credentials(),
+        credentials=auth.get_service_credentials({}),

Review Comment:
   Let's pass `None` (or have None a default value).



##########
sdks/python/apache_beam/internal/gcp/auth.py:
##########
@@ -160,3 +167,26 @@ def _get_service_credentials():
           'Connecting anonymously.',
           e)
       return None
+
+  @staticmethod
+  def _add_impersonation_credentials(credentials, pipeline_options):
+    if not pipeline_options:
+      return credentials
+    if isinstance(pipeline_options, PipelineOptions):
+      gcs_options = pipeline_options.view_as(GoogleCloudOptions)
+      impersonate_service_account = gcs_options.impersonate_service_account
+    else:
+      impersonate_service_account = pipeline_options.get(

Review Comment:
   When would this param be not PipelineOptions?
   If somehow this value is dictionary,  this call will crash with key error.



##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -1475,8 +1475,12 @@ def __init__(self, sink, test_bigquery_client=None, 
buffer_size=None):
 
     # If table schema did not define a project we default to executing project.
     if self.project_id is None and hasattr(sink, 'pipeline_options'):
+      self._pipeline_options = sink.pipeline_options

Review Comment:
   Do we need this?



##########
sdks/python/apache_beam/internal/gcp/auth.py:
##########
@@ -59,7 +73,7 @@ def set_running_in_gce(worker_executing_project):
   executing_project = worker_executing_project
 
 
-def get_service_credentials():
+def get_service_credentials(pipeline_options):

Review Comment:
   You can also mention in the docstring that
   
   How pipeline options are used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to