[
https://issues.apache.org/jira/browse/BEAM-14014?focusedWorklogId=769344&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-769344
]
ASF GitHub Bot logged work on BEAM-14014:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 11/May/22 23:00
Start Date: 11/May/22 23:00
Worklog Time Spent: 10m
Work Description: tvalentyn commented on code in PR #17244:
URL: https://github.com/apache/beam/pull/17244#discussion_r870706037
##########
sdks/python/apache_beam/internal/gcp/auth.py:
##########
@@ -59,7 +73,7 @@ def set_running_in_gce(worker_executing_project):
executing_project = worker_executing_project
-def get_service_credentials():
+def get_service_credentials(pipeline_options):
Review Comment:
Any concerns with setting `pipeline_options=None` ?
##########
sdks/python/apache_beam/io/gcp/gcsfilesystem.py:
##########
@@ -45,6 +45,10 @@ class GCSFileSystem(FileSystem):
CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations
GCS_PREFIX = 'gs://'
+ def __init__(self, pipeline_options):
+ super().__init__(pipeline_options)
Review Comment:
another possibility is to initialize the global credential here, like:
_ = auth.get_service_credentials(pipeline_options)
##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -328,7 +328,7 @@ class BigQueryWrapper(object):
def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None):
self.client = client or bigquery.BigqueryV2(
http=get_new_http(),
- credentials=auth.get_service_credentials(),
+ credentials=auth.get_service_credentials({}),
Review Comment:
Let's pass `None` (or have None a default value).
##########
sdks/python/apache_beam/internal/gcp/auth.py:
##########
@@ -160,3 +167,26 @@ def _get_service_credentials():
'Connecting anonymously.',
e)
return None
+
+ @staticmethod
+ def _add_impersonation_credentials(credentials, pipeline_options):
+ if not pipeline_options:
+ return credentials
+ if isinstance(pipeline_options, PipelineOptions):
+ gcs_options = pipeline_options.view_as(GoogleCloudOptions)
+ impersonate_service_account = gcs_options.impersonate_service_account
+ else:
+ impersonate_service_account = pipeline_options.get(
Review Comment:
When would this param be not PipelineOptions?
If somehow this value is dictionary, this call will crash with key error.
##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -1475,8 +1475,12 @@ def __init__(self, sink, test_bigquery_client=None,
buffer_size=None):
# If table schema did not define a project we default to executing project.
if self.project_id is None and hasattr(sink, 'pipeline_options'):
+ self._pipeline_options = sink.pipeline_options
Review Comment:
Do we need this?
##########
sdks/python/apache_beam/internal/gcp/auth.py:
##########
@@ -59,7 +73,7 @@ def set_running_in_gce(worker_executing_project):
executing_project = worker_executing_project
-def get_service_credentials():
+def get_service_credentials(pipeline_options):
Review Comment:
You can also mention in the docstring that
How pipeline options are used.
Issue Time Tracking
-------------------
Worklog Id: (was: 769344)
Time Spent: 5.5h (was: 5h 20m)
> Support impersonation credentials in Dataflow runner.
> -----------------------------------------------------
>
> Key: BEAM-14014
> URL: https://issues.apache.org/jira/browse/BEAM-14014
> Project: Beam
> Issue Type: Improvement
> Components: runner-dataflow
> Reporter: Valentyn Tymofieiev
> Assignee: Ryan Thompson
> Priority: P2
> Time Spent: 5.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)