marcosmarxm commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r614317645



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,149 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict
+    :param project_id: The name of the project where we want to update the 
dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to 
Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id 
parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have 
domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @classmethod
+    def _patch_schema(cls, old_schema, new_schema):
+        # Updates the content of "old_schema" with

Review comment:
       ```suggestion
       def _patch_schema(cls, old_schema: Dict, new_schema: Dict) -> None:
           """
           Updates the content of "old_schema" with
           the fields from new_schema. Makes changes
           in place and hence has no return value.
           Works recursively for sub-records.
   
           Start by turning the schema list of fields into
           a dict keyed on the field name for both the old
           and the new schema.
           
           :param old_schema: Old schema...
           :type old_schema: dict
           """
   ```
   
   This way information about the function itself will be render in Airflow 
website and users have more information about it, take a look [in a example 
here](
   
https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/bigquery/index.html#airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_schema)

##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,149 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict
+    :param project_id: The name of the project where we want to update the 
dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to 
Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id 
parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have 
domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @classmethod
+    def _patch_schema(cls, old_schema, new_schema):
+        # Updates the content of "old_schema" with
+        # the fields from new_schema. Makes changes
+        # in place and hence has no return value.
+        # Works recursively for sub-records
+
+        # Start by turning the schema list of fields into
+        # a dict keyed on the field name for both the old
+        # and the new schema
+        new_fields = {field["name"]: field for field in new_schema["fields"] 
if "name" in field}
+        old_fields = {field["name"]: field for field in old_schema["fields"]}
+
+        # Iterate over all new fields and update the
+        # old_schema dict
+        for field_key in new_fields.keys():
+            # Check if the field exists in the old_schema, if
+            # so change it
+            if field_key in old_fields:
+                old_field = old_fields[field_key]
+                new_field = new_fields[field_key]
+                # Check if recursion is needed
+                if "fields" in new_field:
+                    cls._patch_schema(old_field, new_field)
+                    del new_field["fields"]
+
+                # Do the update
+                old_field.update(new_field)
+
+            # Field didn't exist, add it as a new field
+            else:
+                old_schema["fields"].append(new_fields[field_key])
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        dataset_id: Optional[str] = None,
+        schema_fields: List[Dict[str, Any]],
+        table_id: Optional[str] = None,
+        project_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        self.schema_fields = schema_fields
+        self.table_id = table_id
+        self.dataset_id = dataset_id
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+        super().__init__(**kwargs)
+
+    def execute(self, context):
+        bq_hook = BigQueryHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        table_schema = bq_hook.get_schema(
+            dataset_id=self.dataset_id, table_id=self.table_id, 
project_id=self.project_id
+        )
+
+        self._patch_schema(old_schema=table_schema, new_schema={"fields": 
self.schema_fields})
+
+        return bq_hook.update_table(
+            table_resource={"schema": table_schema},
+            fields=["schema"],
+            dataset_id=self.dataset_id,
+            table_id=self.table_id,
+            project_id=self.project_id,
+        )

Review comment:
       BigQueryHook has `patch_table`, `patch_dataset`, `get_schema` methods... 
maybe the function `_patch_schema` should belong to the BigqueryHook class. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to