moiseenkov opened a new pull request, #36974:
URL: https://github.com/apache/airflow/pull/36974

   This PR provides a small fix that adds missing scopes into a token.
   
   Relates to #34727 and continues #36849, #36903.
   
   In our use case we have a DAG (see below) with `BigQueryInsertJobOperator` 
with the following characteristics:
   - deferrable mode;
   - impersonation chain contains service account;
   - the BigQuery job operates over a dataset that belongs to a different GCP 
project.
   
   In this case instantiation of the newly implemented class 
[_CredentialsToken](https://github.com/king/airflow/blob/524540077694c861c64e8928fa7a9103b614b9bd/airflow/providers/google/common/hooks/base_google.py#L631)
 fails with exception raise in the base class:
   ```
   {bigquery.py:111} ERROR - Exception occurred while checking for query 
completion
   Traceback (most recent call last):
     File "/opt/airflow/airflow/providers/google/cloud/triggers/bigquery.py", 
line 90, in run
       job_status = await hook.get_job_status(job_id=self.job_id, 
project_id=self.project_id)
     File "/opt/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 
3300, in get_job_status
       job_client = await self.get_job_instance(project_id, job_id, s)
     File "/opt/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 
3290, in get_job_instance
       token = await self.get_token(session=session)
     File "/opt/airflow/airflow/providers/google/common/hooks/base_google.py", 
line 695, in get_token
       return await _CredentialsToken.from_hook(sync_hook, session=session)
     File "/opt/airflow/airflow/providers/google/common/hooks/base_google.py", 
line 658, in from_hook
       return cls(
     File "/opt/airflow/airflow/providers/google/common/hooks/base_google.py", 
line 646, in __init__
       super().__init__(session=cast(Session, session))
     File "/usr/local/lib/python3.8/site-packages/gcloud/aio/auth/token.py", 
line 157, in __init__
       raise Exception(
   Exception: scopes must be provided when token type is service account
   ```
   
   Current PR simply passes hook scopes into the `_CredentialsToken`, so the 
base class `Token` would work properly.
   
   ---
   **DAG**
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.providers.google.cloud.operators.bigquery import (
       BigQueryCreateEmptyDatasetOperator,
       BigQueryCreateEmptyTableOperator,
       BigQueryInsertJobOperator,
   )
   
   ENV_ID = "TEST_ENV"
   PROJECT_ID = "TEST_PROJECT"
   
   DAG_ID = "example_bigquery_queries_async"
   
   DATASET_NAME = f"bq_d_{DAG_ID}_{ENV_ID}".replace("-", "_")
   LOCATION = "us"
   
   TABLE_NAME_1 = f"bq_{DAG_ID}_{ENV_ID}_1".replace("-", "_")
   TABLE_NAME_2 = f"table_{DAG_ID}_{ENV_ID}_2".replace("-", "_")
   
   SCHEMA = [
       {"name": "value", "type": "INTEGER", "mode": "REQUIRED"},
       {"name": "name", "type": "STRING", "mode": "NULLABLE"},
       {"name": "ds", "type": "STRING", "mode": "NULLABLE"},
   ]
   
   INSERT_DATE = datetime.now().strftime("%Y-%m-%d")
   INSERT_ROWS_QUERY = (
       f"INSERT {DATASET_NAME}.{TABLE_NAME_1} VALUES "
       f"(42, 'monthy python', '{INSERT_DATE}'), "
       f"(42, 'fishy fish', '{INSERT_DATE}');"
   )
   SA = "impersonate-sa-exam...@developer.gserviceaccount.com"
   
   
   with DAG(
       dag_id=DAG_ID,
       schedule="@once",
       start_date=datetime(2022, 1, 1),
       catchup=False,
       tags=["example", "bigquery", "deferrable"],
       user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_NAME_1},
   ) as dag:
   
       create_dataset = BigQueryCreateEmptyDatasetOperator(
           task_id="create_dataset",
           dataset_id=DATASET_NAME,
           location=LOCATION,
           project_id=PROJECT_ID,
           impersonation_chain=SA,
       )
   
       create_table_1 = BigQueryCreateEmptyTableOperator(
           task_id="create_table_1",
           dataset_id=DATASET_NAME,
           table_id=TABLE_NAME_1,
           schema_fields=SCHEMA,
           location=LOCATION,
           project_id=PROJECT_ID,
           impersonation_chain=SA,
       )
   
       insert_query_job_imp = BigQueryInsertJobOperator(
           task_id="insert_query_job_imp",
           project_id=PROJECT_ID,
           configuration={
               "query": {
                   "query": INSERT_ROWS_QUERY,
                   "useLegacySql": False,
                   "priority": "BATCH",
               }
           },
           location=LOCATION,
           impersonation_chain=SA,
           deferrable=True,
       )
   
       create_dataset >> create_table_1 >> insert_query_job_imp
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to