This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new cf6324e  Implement BigQuery Table Schema Update Operator (#15367)
cf6324e is described below

commit cf6324e43b2f7c183c3872704733b69d1498cda1
Author: Jens Larsson <[email protected]>
AuthorDate: Tue May 4 18:21:08 2021 +0200

    Implement BigQuery Table Schema Update Operator (#15367)
    
    Co-authored-by: Jens Larsson <[email protected]>
---
 .../example_dags/example_bigquery_operations.py    |  14 ++
 airflow/providers/google/cloud/hooks/bigquery.py   |  95 ++++++++++++
 .../providers/google/cloud/operators/bigquery.py   | 112 ++++++++++++++
 .../operators/cloud/bigquery.rst                   |  17 ++
 .../providers/google/cloud/hooks/test_bigquery.py  | 172 +++++++++++++++++++++
 .../google/cloud/operators/test_bigquery.py        |  31 ++++
 6 files changed, 441 insertions(+)

diff --git 
a/airflow/providers/google/cloud/example_dags/example_bigquery_operations.py 
b/airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
index b487efb..a72ff36 100644
--- a/airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
+++ b/airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
@@ -36,6 +36,7 @@ from airflow.providers.google.cloud.operators.bigquery import 
(
     BigQueryPatchDatasetOperator,
     BigQueryUpdateDatasetOperator,
     BigQueryUpdateTableOperator,
+    BigQueryUpdateTableSchemaOperator,
     BigQueryUpsertTableOperator,
 )
 from airflow.utils.dates import days_ago
@@ -73,6 +74,18 @@ with models.DAG(
     )
     # [END howto_operator_bigquery_create_table]
 
+    # [START howto_operator_bigquery_update_table_schema]
+    update_table_schema = BigQueryUpdateTableSchemaOperator(
+        task_id="update_table_schema",
+        dataset_id=DATASET_NAME,
+        table_id="test_table",
+        schema_fields_updates=[
+            {"name": "emp_name", "description": "Name of employee"},
+            {"name": "salary", "description": "Monthly salary in USD"},
+        ],
+    )
+    # [END howto_operator_bigquery_update_table_schema]
+
     # [START howto_operator_bigquery_delete_table]
     delete_table = BigQueryDeleteTableOperator(
         task_id="delete_table",
@@ -216,6 +229,7 @@ with models.DAG(
             delete_view,
         ]
         >> upsert_table
+        >> update_table_schema
         >> delete_materialized_view
         >> delete_table
         >> delete_dataset
diff --git a/airflow/providers/google/cloud/hooks/bigquery.py 
b/airflow/providers/google/cloud/hooks/bigquery.py
index 044c014..e36baf5 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -1374,6 +1374,101 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
         return {"fields": [s.to_api_repr() for s in table.schema]}
 
     @GoogleBaseHook.fallback_to_default_project_id
+    def update_table_schema(
+        self,
+        schema_fields_updates: List[Dict[str, Any]],
+        include_policy_tags: bool,
+        dataset_id: str,
+        table_id: str,
+        project_id: Optional[str] = None,
+    ) -> None:
+        """
+        Update fields within a schema for a given dataset and table. Note that
+        some fields in schemas are immutable and trying to change them will 
cause
+        an exception.
+        If a new field is included it will be inserted which requires all 
required fields to be set.
+        See 
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        :param include_policy_tags: If set to True policy tags will be 
included in
+            the update request which requires special permissions even if 
unchanged
+            see 
https://cloud.google.com/bigquery/docs/column-level-security#roles
+        :type include_policy_tags: bool
+        :param dataset_id: the dataset ID of the requested table to be updated
+        :type dataset_id: str
+        :param table_id: the table ID of the table to be updated
+        :type table_id: str
+        :param schema_fields_updates: a partial schema resource. see
+            
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        **Example**: ::
+
+            schema_fields_updates=[
+                {"name": "emp_name", "description": "Some New Description"},
+                {"name": "salary", "description": "Some New Description"},
+                {"name": "departments", "fields": [
+                    {"name": "name", "description": "Some New Description"},
+                    {"name": "type", "description": "Some New Description"}
+                ]},
+            ]
+
+        :type schema_fields_updates: List[dict]
+        :param project_id: The name of the project where we want to update the 
table.
+        :type project_id: str
+        """
+
+        def _build_new_schema(
+            current_schema: List[Dict[str, Any]], schema_fields_updates: 
List[Dict[str, Any]]
+        ) -> List[Dict[str, Any]]:
+
+            # Turn schema_field_updates into a dict keyed on field names
+            schema_fields_updates = {field["name"]: field for field in 
deepcopy(schema_fields_updates)}
+
+            # Create a new dict for storing the new schema, initated based on 
the current_schema
+            # as of Python 3.6, dicts retain order.
+            new_schema = {field["name"]: field for field in 
deepcopy(current_schema)}
+
+            # Each item in schema_fields_updates contains a potential patch
+            # to a schema field, iterate over them
+            for field_name, patched_value in schema_fields_updates.items():
+                # If this field already exists, update it
+                if field_name in new_schema:
+                    # If this field is of type RECORD and has a fields key we 
need to patch it recursively
+                    if "fields" in patched_value:
+                        patched_value["fields"] = _build_new_schema(
+                            new_schema[field_name]["fields"], 
patched_value["fields"]
+                        )
+                    # Update the new_schema with the patched value
+                    new_schema[field_name].update(patched_value)
+                # This is a new field, just include the whole configuration 
for it
+                else:
+                    new_schema[field_name] = patched_value
+
+            return list(new_schema.values())
+
+        def _remove_policy_tags(schema: List[Dict[str, Any]]):
+            for field in schema:
+                if "policyTags" in field:
+                    del field["policyTags"]
+                if "fields" in field:
+                    _remove_policy_tags(field["fields"])
+
+        current_table_schema = self.get_schema(
+            dataset_id=dataset_id, table_id=table_id, project_id=project_id
+        )["fields"]
+        new_schema = _build_new_schema(current_table_schema, 
schema_fields_updates)
+
+        if not include_policy_tags:
+            _remove_policy_tags(new_schema)
+
+        self.update_table(
+            table_resource={"schema": {"fields": new_schema}},
+            fields=["schema"],
+            project_id=project_id,
+            dataset_id=dataset_id,
+            table_id=table_id,
+        )
+
+    @GoogleBaseHook.fallback_to_default_project_id
     def poll_job_complete(
         self,
         job_id: str,
diff --git a/airflow/providers/google/cloud/operators/bigquery.py 
b/airflow/providers/google/cloud/operators/bigquery.py
index 67e38f4..1c2cbed 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -2039,6 +2039,118 @@ class BigQueryUpsertTableOperator(BaseOperator):
         )
 
 
+class BigQueryUpdateTableSchemaOperator(BaseOperator):
+    """
+    Update BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied 
schema_fields_updates
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply keys & values for the
+    items you want to patch, just ensure the "name" key is set.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:BigQueryUpdateTableSchemaOperator`
+
+    :param schema_fields_updates: a partial schema resource. see
+        
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields_updates=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "policyTags": {'names': 
['some_new_policy_tag']},},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields_updates: List[dict]
+    :param include_policy_tags: (Optional) If set to True policy tags will be 
included in
+        the update request which requires special permissions even if 
unchanged (default False)
+        see https://cloud.google.com/bigquery/docs/column-level-security#roles
+    :type include_policy_tags: bool
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param table_id: The table ID of the requested table. (templated)
+    :type table_id: str
+    :param project_id: The name of the project where we want to update the 
dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to 
Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id 
parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have 
domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields_updates',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields_updates": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        schema_fields_updates: List[Dict[str, Any]],
+        include_policy_tags: Optional[bool] = False,
+        dataset_id: Optional[str] = None,
+        table_id: Optional[str] = None,
+        project_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        self.schema_fields_updates = schema_fields_updates
+        self.include_policy_tags = include_policy_tags
+        self.table_id = table_id
+        self.dataset_id = dataset_id
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+        super().__init__(**kwargs)
+
+    def execute(self, context):
+        bq_hook = BigQueryHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        return bq_hook.update_table_schema(
+            schema_fields_updates=self.schema_fields_updates,
+            include_policy_tags=self.include_policy_tags,
+            dataset_id=self.dataset_id,
+            table_id=self.table_id,
+            project_id=self.project_id,
+        )
+
+
 # pylint: disable=too-many-arguments
 class BigQueryInsertJobOperator(BaseOperator):
     """
diff --git a/docs/apache-airflow-providers-google/operators/cloud/bigquery.rst 
b/docs/apache-airflow-providers-google/operators/cloud/bigquery.rst
index 7fc2780..b99971a 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/bigquery.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/bigquery.rst
@@ -245,6 +245,23 @@ in the given dataset.
     :start-after: [START howto_operator_bigquery_upsert_table]
     :end-before: [END howto_operator_bigquery_upsert_table]
 
+.. _howto/operator:BigQueryUpdateTableSchemaOperator:
+
+Update table schema
+"""""""""""""""""""
+
+To update the schema of a table you can use
+:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryUpdateTableSchemaOperator`.
+
+This operator updates the schema field values supplied, while leaving the rest 
unchanged. This is useful
+for instance to set new field descriptions on an existing table schema.
+
+.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_bigquery_update_table_schema]
+    :end-before: [END howto_operator_bigquery_update_table_schema]
+
 .. _howto/operator:BigQueryDeleteTableOperator:
 
 Delete table
diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py 
b/tests/providers/google/cloud/hooks/test_bigquery.py
index f7b09be..7905741 100644
--- a/tests/providers/google/cloud/hooks/test_bigquery.py
+++ b/tests/providers/google/cloud/hooks/test_bigquery.py
@@ -676,6 +676,178 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
         assert "fields" in result
         assert len(result["fields"]) == 2
 
+    
@mock.patch('airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_schema')
+    
@mock.patch('airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.update_table')
+    def test_update_table_schema_with_policy_tags(self, mock_update, 
mock_get_schema):
+        mock_get_schema.return_value = {
+            "fields": [
+                {'name': 'emp_name', 'type': 'STRING', 'mode': 'REQUIRED'},
+                {
+                    'name': 'salary',
+                    'type': 'INTEGER',
+                    'mode': 'REQUIRED',
+                    'policyTags': {'names': ['sensitive']},
+                },
+                {'name': 'not_changed', 'type': 'INTEGER', 'mode': 'REQUIRED'},
+                {
+                    'name': 'subrecord',
+                    'type': 'RECORD',
+                    'mode': 'REQUIRED',
+                    'fields': [
+                        {
+                            'name': 'field_1',
+                            'type': 'STRING',
+                            'mode': 'REQUIRED',
+                            'policyTags': {'names': ['sensitive']},
+                        },
+                    ],
+                },
+            ]
+        }
+
+        schema_fields_updates = [
+            {'name': 'emp_name', 'description': 'Name of employee', 
'policyTags': {'names': ['sensitive']}},
+            {
+                'name': 'salary',
+                'description': 'Monthly salary in USD',
+                'policyTags': {},
+            },
+            {
+                'name': 'subrecord',
+                'description': 'Some Desc',
+                'fields': [
+                    {'name': 'field_1', 'description': 'Some nested desc'},
+                ],
+            },
+        ]
+
+        expected_result_schema = {
+            'fields': [
+                {
+                    'name': 'emp_name',
+                    'type': 'STRING',
+                    'mode': 'REQUIRED',
+                    'description': 'Name of employee',
+                    'policyTags': {'names': ['sensitive']},
+                },
+                {
+                    'name': 'salary',
+                    'type': 'INTEGER',
+                    'mode': 'REQUIRED',
+                    'description': 'Monthly salary in USD',
+                    'policyTags': {},
+                },
+                {'name': 'not_changed', 'type': 'INTEGER', 'mode': 'REQUIRED'},
+                {
+                    'name': 'subrecord',
+                    'type': 'RECORD',
+                    'mode': 'REQUIRED',
+                    'description': 'Some Desc',
+                    'fields': [
+                        {
+                            'name': 'field_1',
+                            'type': 'STRING',
+                            'mode': 'REQUIRED',
+                            'description': 'Some nested desc',
+                            'policyTags': {'names': ['sensitive']},
+                        }
+                    ],
+                },
+            ]
+        }
+
+        self.hook.update_table_schema(
+            schema_fields_updates=schema_fields_updates,
+            include_policy_tags=True,
+            dataset_id=DATASET_ID,
+            table_id=TABLE_ID,
+        )
+
+        mock_update.assert_called_once_with(
+            dataset_id=DATASET_ID,
+            table_id=TABLE_ID,
+            project_id=PROJECT_ID,
+            table_resource={'schema': expected_result_schema},
+            fields=['schema'],
+        )
+
+    
@mock.patch('airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_schema')
+    
@mock.patch('airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.update_table')
+    def test_update_table_schema_without_policy_tags(self, mock_update, 
mock_get_schema):
+        mock_get_schema.return_value = {
+            "fields": [
+                {'name': 'emp_name', 'type': 'STRING', 'mode': 'REQUIRED'},
+                {'name': 'salary', 'type': 'INTEGER', 'mode': 'REQUIRED'},
+                {'name': 'not_changed', 'type': 'INTEGER', 'mode': 'REQUIRED'},
+                {
+                    'name': 'subrecord',
+                    'type': 'RECORD',
+                    'mode': 'REQUIRED',
+                    'fields': [
+                        {'name': 'field_1', 'type': 'STRING', 'mode': 
'REQUIRED'},
+                    ],
+                },
+            ]
+        }
+
+        schema_fields_updates = [
+            {'name': 'emp_name', 'description': 'Name of employee'},
+            {
+                'name': 'salary',
+                'description': 'Monthly salary in USD',
+                'policyTags': {'names': ['sensitive']},
+            },
+            {
+                'name': 'subrecord',
+                'description': 'Some Desc',
+                'fields': [
+                    {'name': 'field_1', 'description': 'Some nested desc'},
+                ],
+            },
+        ]
+
+        expected_result_schema = {
+            'fields': [
+                {'name': 'emp_name', 'type': 'STRING', 'mode': 'REQUIRED', 
'description': 'Name of employee'},
+                {
+                    'name': 'salary',
+                    'type': 'INTEGER',
+                    'mode': 'REQUIRED',
+                    'description': 'Monthly salary in USD',
+                },
+                {'name': 'not_changed', 'type': 'INTEGER', 'mode': 'REQUIRED'},
+                {
+                    'name': 'subrecord',
+                    'type': 'RECORD',
+                    'mode': 'REQUIRED',
+                    'description': 'Some Desc',
+                    'fields': [
+                        {
+                            'name': 'field_1',
+                            'type': 'STRING',
+                            'mode': 'REQUIRED',
+                            'description': 'Some nested desc',
+                        }
+                    ],
+                },
+            ]
+        }
+
+        self.hook.update_table_schema(
+            schema_fields_updates=schema_fields_updates,
+            include_policy_tags=False,
+            dataset_id=DATASET_ID,
+            table_id=TABLE_ID,
+        )
+
+        mock_update.assert_called_once_with(
+            dataset_id=DATASET_ID,
+            table_id=TABLE_ID,
+            project_id=PROJECT_ID,
+            table_resource={'schema': expected_result_schema},
+            fields=['schema'],
+        )
+
     
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_service")
     def test_invalid_source_format(self, mock_get_service):
         with pytest.raises(
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py 
b/tests/providers/google/cloud/operators/test_bigquery.py
index 61030f6..801034c 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -46,6 +46,7 @@ from airflow.providers.google.cloud.operators.bigquery import 
(
     BigQueryPatchDatasetOperator,
     BigQueryUpdateDatasetOperator,
     BigQueryUpdateTableOperator,
+    BigQueryUpdateTableSchemaOperator,
     BigQueryUpsertTableOperator,
     BigQueryValueCheckOperator,
 )
@@ -290,6 +291,36 @@ class TestBigQueryUpdateTableOperator(unittest.TestCase):
         )
 
 
+class TestBigQueryUpdateTableSchemaOperator(unittest.TestCase):
+    
@mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
+    def test_execute(self, mock_hook):
+
+        schema_field_updates = [
+            {
+                'name': 'emp_name',
+                'description': 'Name of employee',
+            }
+        ]
+
+        operator = BigQueryUpdateTableSchemaOperator(
+            schema_fields_updates=schema_field_updates,
+            include_policy_tags=False,
+            task_id=TASK_ID,
+            dataset_id=TEST_DATASET,
+            table_id=TEST_TABLE_ID,
+            project_id=TEST_GCP_PROJECT_ID,
+        )
+        operator.execute(None)
+
+        mock_hook.return_value.update_table_schema.assert_called_once_with(
+            schema_fields_updates=schema_field_updates,
+            include_policy_tags=False,
+            dataset_id=TEST_DATASET,
+            table_id=TEST_TABLE_ID,
+            project_id=TEST_GCP_PROJECT_ID,
+        )
+
+
 class TestBigQueryPatchDatasetOperator(unittest.TestCase):
     
@mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
     def test_execute(self, mock_hook):

Reply via email to