vincbeck commented on code in PR #66721:
URL: https://github.com/apache/airflow/pull/66721#discussion_r3222039706
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/glue_catalog.py:
##########
@@ -336,3 +336,67 @@ def execute(self, context: Context) -> None:
else:
raise
self.log.info("Partition created.")
+
+
+class GlueCatalogBatchDeletePartitionOperator(AwsBaseOperator[AwsBaseHook]):
+ """
+ Delete one or more partitions from an AWS Glue Data Catalog table.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GlueCatalogBatchDeletePartitionOperator`
+
+ :param database_name: The name of the database. (templated)
+ :param table_name: The name of the table. (templated)
+ :param partitions_to_delete: List of partition value lists to delete.
(templated)
+ :param catalog_id: The ID of the Data Catalog. Defaults to the account ID.
(templated)
+ """
+
+ aws_hook_class = AwsBaseHook
+ template_fields: tuple[str, ...] = (
+ *AwsBaseOperator.template_fields,
+ "database_name",
+ "table_name",
+ "catalog_id",
+ )
+
+ def __init__(
+ self,
+ *,
+ database_name: str,
+ table_name: str,
+ partitions_to_delete: list[dict[str, list[str]]],
+ catalog_id: str | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.database_name = database_name
+ self.table_name = table_name
+ self.partitions_to_delete = partitions_to_delete
+ self.catalog_id = catalog_id
+
+ @property
+ def _hook_parameters(self) -> dict[str, Any]:
+ return {**super()._hook_parameters, "client_type": "glue"}
+
+ def execute(self, context: Context) -> list[dict[str, Any]]:
+ self.log.info(
+ "Deleting %d partitions from %s.%s",
+ len(self.partitions_to_delete),
+ self.database_name,
+ self.table_name,
+ )
+ kwargs: dict[str, Any] = prune_dict(
+ {
+ "DatabaseName": self.database_name,
+ "TableName": self.table_name,
+ "PartitionsToDelete": self.partitions_to_delete,
+ "CatalogId": self.catalog_id,
+ }
+ )
+ response = self.hook.conn.batch_delete_partition(**kwargs)
+ errors = response.get("Errors", [])
+ if errors:
+ self.log.warning("Errors deleting partitions: %s", errors)
Review Comment:
In case of errors the operator would fail? I am also surprised, what kind of
error can be returned? Usually, if there is an error, boto3 returns an
exception. Here, the call would be successfully but would return a list of
errors?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]