moiseenkov opened a new pull request, #36974: URL: https://github.com/apache/airflow/pull/36974
This PR provides a small fix that adds missing scopes into a token. Relates to #34727 and continues #36849, #36903. In our use case we have a DAG (see below) with `BigQueryInsertJobOperator` with the following characteristics: - deferrable mode; - impersonation chain contains service account; - the BigQuery job operates over a dataset that belongs to a different GCP project. In this case instantiation of the newly implemented class [_CredentialsToken](https://github.com/king/airflow/blob/524540077694c861c64e8928fa7a9103b614b9bd/airflow/providers/google/common/hooks/base_google.py#L631) fails with exception raise in the base class: ``` {bigquery.py:111} ERROR - Exception occurred while checking for query completion Traceback (most recent call last): File "/opt/airflow/airflow/providers/google/cloud/triggers/bigquery.py", line 90, in run job_status = await hook.get_job_status(job_id=self.job_id, project_id=self.project_id) File "/opt/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 3300, in get_job_status job_client = await self.get_job_instance(project_id, job_id, s) File "/opt/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 3290, in get_job_instance token = await self.get_token(session=session) File "/opt/airflow/airflow/providers/google/common/hooks/base_google.py", line 695, in get_token return await _CredentialsToken.from_hook(sync_hook, session=session) File "/opt/airflow/airflow/providers/google/common/hooks/base_google.py", line 658, in from_hook return cls( File "/opt/airflow/airflow/providers/google/common/hooks/base_google.py", line 646, in __init__ super().__init__(session=cast(Session, session)) File "/usr/local/lib/python3.8/site-packages/gcloud/aio/auth/token.py", line 157, in __init__ raise Exception( Exception: scopes must be provided when token type is service account ``` Current PR simply passes hook scopes into the `_CredentialsToken`, so the base class `Token` would work properly. --- **DAG** ```python from datetime import datetime from airflow import DAG from airflow.providers.google.cloud.operators.bigquery import ( BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator, BigQueryInsertJobOperator, ) ENV_ID = "TEST_ENV" PROJECT_ID = "TEST_PROJECT" DAG_ID = "example_bigquery_queries_async" DATASET_NAME = f"bq_d_{DAG_ID}_{ENV_ID}".replace("-", "_") LOCATION = "us" TABLE_NAME_1 = f"bq_{DAG_ID}_{ENV_ID}_1".replace("-", "_") TABLE_NAME_2 = f"table_{DAG_ID}_{ENV_ID}_2".replace("-", "_") SCHEMA = [ {"name": "value", "type": "INTEGER", "mode": "REQUIRED"}, {"name": "name", "type": "STRING", "mode": "NULLABLE"}, {"name": "ds", "type": "STRING", "mode": "NULLABLE"}, ] INSERT_DATE = datetime.now().strftime("%Y-%m-%d") INSERT_ROWS_QUERY = ( f"INSERT {DATASET_NAME}.{TABLE_NAME_1} VALUES " f"(42, 'monthy python', '{INSERT_DATE}'), " f"(42, 'fishy fish', '{INSERT_DATE}');" ) SA = "impersonate-sa-exam...@developer.gserviceaccount.com" with DAG( dag_id=DAG_ID, schedule="@once", start_date=datetime(2022, 1, 1), catchup=False, tags=["example", "bigquery", "deferrable"], user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_NAME_1}, ) as dag: create_dataset = BigQueryCreateEmptyDatasetOperator( task_id="create_dataset", dataset_id=DATASET_NAME, location=LOCATION, project_id=PROJECT_ID, impersonation_chain=SA, ) create_table_1 = BigQueryCreateEmptyTableOperator( task_id="create_table_1", dataset_id=DATASET_NAME, table_id=TABLE_NAME_1, schema_fields=SCHEMA, location=LOCATION, project_id=PROJECT_ID, impersonation_chain=SA, ) insert_query_job_imp = BigQueryInsertJobOperator( task_id="insert_query_job_imp", project_id=PROJECT_ID, configuration={ "query": { "query": INSERT_ROWS_QUERY, "useLegacySql": False, "priority": "BATCH", } }, location=LOCATION, impersonation_chain=SA, deferrable=True, ) create_dataset >> create_table_1 >> insert_query_job_imp ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org